1
1
import numpy as np
2
- from numpy import hamming , hanning
2
+ from collections import deque
3
3
from PyQt5 .QtWidgets import QApplication , QVBoxLayout , QHBoxLayout , QMainWindow , QWidget
4
4
from PyQt5 .QtCore import Qt
5
5
from pyqtgraph import PlotWidget
@@ -25,8 +25,8 @@ def __init__(self):
25
25
self .eeg_plot_widget .setBackground ('w' )
26
26
self .eeg_plot_widget .showGrid (x = True , y = True )
27
27
self .eeg_plot_widget .setLabel ('bottom' , 'EEG Plot' )
28
- self .eeg_plot_widget .setYRange (0 , 5000 , padding = 0 )
29
- self .eeg_plot_widget .setXRange (0 , 10 , padding = 0 )
28
+ self .eeg_plot_widget .setYRange (- 5000 , 5000 , padding = 0 )
29
+ self .eeg_plot_widget .setXRange (0 , 2 , padding = 0 )
30
30
self .eeg_plot_widget .setMouseEnabled (x = False , y = True ) # Disable zoom
31
31
self .main_layout .addWidget (self .eeg_plot_widget )
32
32
@@ -72,13 +72,17 @@ def __init__(self):
72
72
self .sampling_rate = int (self .inlet .info ().nominal_srate ())
73
73
print (f"Sampling rate: { self .sampling_rate } Hz" )
74
74
75
- # Data and buffers
76
- self .buffer_size = self .sampling_rate * 10 # Fixed-size buffer for 10 seconds
77
- self .eeg_data = np .zeros (self .buffer_size ) # Fixed-size array for circular buffer
78
- self .time_data = np .linspace (0 , 10 , self .buffer_size ) # Fixed time array for plotting
79
- self .current_index = 0 # Index for overwriting data
75
+ # Data and Buffers
76
+ self .filter_buffer_size = 30 # Minimum length for filtfilt
77
+ self .filter_buffer = deque (maxlen = self .filter_buffer_size )
78
+ self .one_second_buffer = deque (maxlen = self .sampling_rate ) # 1-second buffer
79
+ self .buffer_size = self .sampling_rate * 10
80
+ self .moving_window_size = self .sampling_rate * 2 # 2-second window
81
+
82
+ self .eeg_data = np .zeros (self .buffer_size )
83
+ self .time_data = np .linspace (0 , 10 , self .buffer_size )
84
+ self .current_index = 0
80
85
81
- # Filters
82
86
self .b_notch , self .a_notch = iirnotch (50 , 30 , self .sampling_rate )
83
87
self .b_band , self .a_band = butter (4 , [0.5 / (self .sampling_rate / 2 ), 48.0 / (self .sampling_rate / 2 )], btype = 'band' )
84
88
@@ -91,43 +95,48 @@ def __init__(self):
91
95
self .fft_curve = self .fft_plot .plot (pen = pg .mkPen ('r' , width = 1 )) # FFT Colour is red
92
96
93
97
def update_plot (self ):
94
- samples , _ = self .inlet .pull_chunk (timeout = 0.0 , max_samples = 30 )
98
+ samples , _ = self .inlet .pull_chunk (timeout = 0.0 )
95
99
if samples :
96
100
for sample in samples :
97
- # Overwrite the oldest data point in the buffer
98
- self .eeg_data [self .current_index ] = sample [0 ]
99
- self .current_index = (self .current_index + 1 ) % self .buffer_size # Circular increment
100
-
101
- if self .current_index >= self .buffer_size :
102
- plot_data = self .eeg_data
103
- else :
104
- plot_data = np .concatenate ((self .eeg_data [self .current_index :], self .eeg_data [:self .current_index ]))
105
-
106
- # Apply filters to the full data for EEG plot
107
- filtered_eeg = filtfilt (self .b_notch , self .a_notch , plot_data )
108
- filtered_eeg = filtfilt (self .b_band , self .a_band , filtered_eeg )
109
-
110
- # Update the EEG plot with the filtered data
111
- self .eeg_curve .setData (self .time_data , filtered_eeg )
112
-
113
- # Perform FFT on the latest 1-second slice
114
- latest_data = filtered_eeg [- self .sampling_rate :]
115
- window = np .hanning (len (latest_data ))
116
- filtered_eeg_windowed = latest_data * window
117
-
118
- # Apply zero-padding
119
- zero_padded_length = 512
120
- filtered_eeg_windowed_padded = np .pad (filtered_eeg_windowed , (0 , zero_padded_length - len (filtered_eeg_windowed )), 'constant' )
121
-
122
- eeg_fft = np .abs (fft (filtered_eeg_windowed_padded ))[:len (filtered_eeg_windowed_padded ) // 2 ]
123
- eeg_fft /= len (filtered_eeg_windowed_padded ) # Normalize FFT
124
- freqs = np .fft .fftfreq (len (filtered_eeg_windowed_padded ), 1 / self .sampling_rate )[:len (filtered_eeg_windowed_padded ) // 2 ]
125
-
126
- # Update FFT plot
127
- self .fft_curve .setData (freqs , eeg_fft )
128
-
129
- brainwave_power = self .calculate_brainwave_power (eeg_fft , freqs )
130
- self .brainwave_bars .setOpts (height = brainwave_power )
101
+ raw_point = sample [0 ]
102
+ self .filter_buffer .append (raw_point )
103
+
104
+ # Apply the filters if the buffer is full
105
+ if len (self .filter_buffer ) == self .filter_buffer_size :
106
+ notch_filtered = filtfilt (self .b_notch , self .a_notch , list (self .filter_buffer ))[- 1 ]
107
+ band_filtered = filtfilt (self .b_band , self .a_band , list (self .filter_buffer ))[- 1 ]
108
+ else :
109
+ continue
110
+
111
+ self .eeg_data [self .current_index ] = band_filtered # Plot the filtered data
112
+ self .current_index = (self .current_index + 1 ) % self .buffer_size
113
+
114
+ if self .current_index == 0 :
115
+ plot_data = self .eeg_data
116
+ else :
117
+ plot_data = np .concatenate ((self .eeg_data [self .current_index :], self .eeg_data [:self .current_index ]))
118
+
119
+ recent_data = plot_data [- self .moving_window_size :]
120
+ recent_time = np .linspace (0 , len (recent_data ) / self .sampling_rate , len (recent_data ))
121
+ self .eeg_curve .setData (recent_time , recent_data )
122
+
123
+ self .one_second_buffer .append (band_filtered ) # Add the filtered point to the 1-second buffer
124
+ if len (self .one_second_buffer ) == self .sampling_rate : # Process FFT and brainwave power
125
+ self .process_fft_and_brainpower ()
126
+ self .one_second_buffer .clear ()
127
+
128
+ def process_fft_and_brainpower (self ):
129
+ window = np .hanning (len (self .one_second_buffer )) # Apply Hanning window to the buffer
130
+ buffer_windowed = np .array (self .one_second_buffer ) * window
131
+
132
+ # Perform FFT
133
+ fft_result = np .abs (fft (buffer_windowed ))[:len (buffer_windowed ) // 2 ]
134
+ fft_result /= len (buffer_windowed )
135
+ freqs = np .fft .fftfreq (len (buffer_windowed ), 1 / self .sampling_rate )[:len (buffer_windowed ) // 2 ]
136
+ self .fft_curve .setData (freqs , fft_result )
137
+
138
+ brainwave_power = self .calculate_brainwave_power (fft_result , freqs )
139
+ self .brainwave_bars .setOpts (height = brainwave_power )
131
140
132
141
def calculate_brainwave_power (self , fft_data , freqs ):
133
142
delta_power = np .sum (fft_data [(freqs >= 0.5 ) & (freqs <= 4 )] ** 2 )
@@ -137,13 +146,9 @@ def calculate_brainwave_power(self, fft_data, freqs):
137
146
gamma_power = np .sum (fft_data [(freqs >= 30 ) & (freqs <= 45 )] ** 2 )
138
147
139
148
return [delta_power , theta_power , alpha_power , beta_power , gamma_power ]
140
-
149
+
141
150
if __name__ == "__main__" :
142
151
app = QApplication (sys .argv )
143
152
window = EEGMonitor ()
144
153
window .show ()
145
- sys .exit (app .exec_ ())
146
-
147
- # PSD is calculated now(Previously ASD is calculated)
148
- # FFT is normalized to the len(filtered_eeg_window_padded)
149
- # Hanning window is used now - as it is designed to minimize spectral leakage while hamming may exhibit more spectral leakage.
154
+ sys .exit (app .exec_ ())
0 commit comments