1
+ import numpy as np
2
+ from scipy .signal import butter , lfilter
3
+ import pylsl
4
+ import tkinter as tk
5
+ import threading
6
+ import time
7
+ import pyautogui
8
+
9
+
10
+ class EOGPeakDetector :
11
+ def __init__ (self , blink_button , keystroke_action ):
12
+ self .inlet = None
13
+ self .sampling_rate = None
14
+ self .buffer_size = None
15
+ self .eog_data = None
16
+ self .current_index = 0
17
+ self .b , self .a = None , None
18
+ self .blink_button = blink_button
19
+ self .keystroke_action = keystroke_action
20
+ self .blink_detected = False
21
+ self .running = False
22
+
23
+ def initialize_stream (self ):
24
+ streams = pylsl .resolve_stream ('name' , 'BioAmpDataStream' )
25
+ if not streams :
26
+ print ("No LSL stream found!" )
27
+ return False
28
+
29
+ self .inlet = pylsl .StreamInlet (streams [0 ])
30
+ self .sampling_rate = int (self .inlet .info ().nominal_srate ())
31
+ print (f"Sampling rate: { self .sampling_rate } Hz" )
32
+
33
+ self .buffer_size = self .sampling_rate * 1
34
+ self .eog_data = np .zeros (self .buffer_size )
35
+ self .b , self .a = butter (4 , 10.0 / (0.5 * self .sampling_rate ), btype = 'low' )
36
+ return True
37
+
38
+ def start_detection (self ):
39
+ if not self .initialize_stream ():
40
+ print ("Failed to initialize LSL stream. Cannot start detection." )
41
+ return
42
+
43
+ print ("Starting peak detection..." )
44
+ self .running = True
45
+ while self .running :
46
+ # Pull a chunk of data from the stream
47
+ samples , _ = self .inlet .pull_chunk (timeout = 1.0 , max_samples = 1 )
48
+ if samples :
49
+ for sample in samples :
50
+ # Store the sample in the circular buffer
51
+ self .eog_data [self .current_index ] = sample [0 ]
52
+ self .current_index = (self .current_index + 1 ) % self .buffer_size
53
+
54
+ # Apply the low-pass filter
55
+ filtered_eog = lfilter (self .b , self .a , self .eog_data )
56
+
57
+ # Detect blinks in the filtered signal
58
+ self .detect_blinks (filtered_eog )
59
+
60
+ def stop_detection (self ):
61
+ # Stop the detection loop
62
+ print ("Stopping peak detection..." )
63
+ self .running = False
64
+
65
+ def detect_blinks (self , filtered_eog ):
66
+ mean_signal = np .mean (filtered_eog )
67
+ stdev_signal = np .std (filtered_eog )
68
+ threshold = mean_signal + (2 * stdev_signal ) # Threshold for detecting blinks
69
+
70
+ # Define the window size (1 second of data)
71
+ window_size = self .sampling_rate
72
+ start_index = self .current_index - window_size
73
+ if start_index < 0 :
74
+ start_index = 0
75
+
76
+ # Extract the window of data for peak detection
77
+ filtered_window = filtered_eog [start_index :self .current_index ]
78
+ peaks = self .detect_peaks (filtered_window , threshold )
79
+
80
+ if peaks and not self .blink_detected :
81
+ # Blink detected, trigger actions
82
+ print ("Blink Detected!" )
83
+ self .update_button_color ()
84
+ self .keystroke_action ()
85
+ self .blink_detected = True
86
+ elif not peaks :
87
+ # Reset blink state when no peaks are detected
88
+ self .blink_detected = False
89
+
90
+ def detect_peaks (self , signal , threshold ):
91
+ # Detect peaks in the signal that exceed the threshold
92
+ peaks = []
93
+ for i in range (1 , len (signal ) - 1 ):
94
+ if signal [i ] > signal [i - 1 ] and signal [i ] > signal [i + 1 ] and signal [i ] > threshold :
95
+ peaks .append (i )
96
+ return peaks
97
+
98
+ def update_button_color (self ):
99
+ # Temporarily change the button color to indicate a blink
100
+ self .blink_button .config (bg = "green" )
101
+ self .blink_button .update ()
102
+ time .sleep (0.1 )
103
+ self .blink_button .config (bg = "SystemButtonFace" )
104
+
105
+ def stop_action (detector ):
106
+ # Action for the Quit button
107
+ print ("Quit button pressed. Exiting program." )
108
+ detector .stop_detection ()
109
+ exit ()
110
+
111
+ def keystroke_action ():
112
+ # Action to simulate a spacebar key press
113
+ print ("Spacebar pressed." )
114
+ pyautogui .press ('space' )
115
+
116
+ def start_action (detector ):
117
+ # Action for the Start button
118
+ print ("Start button pressed. Starting the program." )
119
+ detection_thread = threading .Thread (target = detector .start_detection , daemon = True )
120
+ detection_thread .start ()
121
+
122
+ def create_popup ():
123
+ # Create the main GUI window
124
+ popup = tk .Tk ()
125
+ popup .geometry ("350x80" ) # Set the window size
126
+ popup .overrideredirect (1 ) # Remove title bar
127
+ popup .attributes ("-topmost" , True ) # Keep the window on top
128
+
129
+ # Create a single horizontal frame for buttons
130
+ horizontal_frame = tk .Frame (popup )
131
+ horizontal_frame .pack (expand = True , pady = 10 )
132
+
133
+ # Add Blink Detected button
134
+ blink_button = tk .Button (horizontal_frame , text = "Blink Detected" , width = 12 )
135
+ blink_button .pack (side = tk .LEFT , padx = 10 )
136
+
137
+ # Add Start button
138
+ start_button = tk .Button (horizontal_frame , text = "Start" , width = 8 )
139
+ start_button .pack (side = tk .LEFT , padx = 10 )
140
+
141
+ # Add Quit button
142
+ stop_button = tk .Button (horizontal_frame , text = "Quit" , width = 8 )
143
+ stop_button .pack (side = tk .LEFT , padx = 10 )
144
+
145
+ # Initialize the EOG peak detector
146
+ detector = EOGPeakDetector (blink_button , keystroke_action )
147
+
148
+ # Link Start and Stop buttons to their respective actions
149
+ start_button .config (command = lambda : start_action (detector ))
150
+ stop_button .config (command = lambda : stop_action (detector ))
151
+
152
+ popup .mainloop ()
153
+
154
+ # Create the pop-up window and run the application
155
+ create_popup ()
0 commit comments