1
+ import tkinter as tk
2
+ from tkinter import PhotoImage
3
+ import threading
4
+ import pyautogui
1
5
import numpy as np
2
6
from scipy .signal import butter , lfilter
3
7
import pylsl
4
- import tkinter as tk
5
- import threading
6
8
import time
7
- import pyautogui
8
-
9
9
10
10
class EOGPeakDetector :
11
- def __init__ (self , blink_button , keystroke_action ):
11
+ def __init__ (self , blink_button , keystroke_action , connect_button ):
12
12
self .inlet = None
13
13
self .sampling_rate = None
14
14
self .buffer_size = None
@@ -17,163 +17,176 @@ def __init__(self, blink_button, keystroke_action):
17
17
self .b , self .a = None , None
18
18
self .blink_button = blink_button
19
19
self .keystroke_action = keystroke_action
20
+ self .connect_button = connect_button
20
21
self .blink_detected = False
21
22
self .running = False
23
+ self .connected = False
24
+ self .last_blink_time = 0
25
+ self .refractory_period = 0.2
26
+ self .stop_threads = False
22
27
23
28
def initialize_stream (self ):
29
+ print ("Attempting to connect to LSL stream..." )
24
30
streams = pylsl .resolve_stream ('name' , 'BioAmpDataStream' )
25
31
if not streams :
26
32
print ("No LSL stream found!" )
33
+ self .connected = False
27
34
return False
28
-
35
+
29
36
self .inlet = pylsl .StreamInlet (streams [0 ])
30
37
self .sampling_rate = int (self .inlet .info ().nominal_srate ())
31
38
print (f"Sampling rate: { self .sampling_rate } Hz" )
32
-
39
+
33
40
self .buffer_size = self .sampling_rate * 1
34
41
self .eog_data = np .zeros (self .buffer_size )
35
42
self .b , self .a = butter (4 , 10.0 / (0.5 * self .sampling_rate ), btype = 'low' )
43
+ self .connected = True
44
+ print ("LSL stream connected successfully." )
36
45
return True
37
46
38
47
def start_detection (self ):
39
- if not self .initialize_stream ():
40
- print ("Failed to initialize LSL stream. Cannot start detection." )
41
- return
42
-
43
48
print ("Starting peak detection..." )
44
49
self .running = True
45
50
while self .running :
46
- # Pull a chunk of data from the stream
47
- samples , _ = self .inlet .pull_chunk (timeout = 1.0 , max_samples = 1 )
48
- if samples :
49
- for sample in samples :
50
- # Store the sample in the circular buffer
51
- self .eog_data [self .current_index ] = sample [0 ]
52
- self .current_index = (self .current_index + 1 ) % self .buffer_size
53
-
54
- # Apply the low-pass filter
55
- filtered_eog = lfilter (self .b , self .a , self .eog_data )
56
-
57
- # Detect blinks in the filtered signal
58
- self .detect_blinks (filtered_eog )
59
-
60
- def quit_detection (self ):
61
- # Quit the detection loop
62
- print ("Quitting peak detection..." )
63
- self .running = False
51
+ try :
52
+ samples , _ = self .inlet .pull_chunk (timeout = 1.0 , max_samples = 1 )
53
+ if samples :
54
+ for sample in samples :
55
+ self .eog_data [self .current_index ] = sample [0 ]
56
+ self .current_index = (self .current_index + 1 ) % self .buffer_size
57
+
58
+ filtered_eog = lfilter (self .b , self .a , self .eog_data )
59
+ self .detect_blinks (filtered_eog )
60
+ except Exception as e :
61
+ print (f"Error in detection: { e } " )
62
+ break
64
63
65
64
def stop_detection (self ):
66
- # Stop the detection loop
67
65
print ("Stopping peak detection..." )
68
66
self .running = False
69
67
70
68
def detect_blinks (self , filtered_eog ):
71
69
mean_signal = np .mean (filtered_eog )
72
70
stdev_signal = np .std (filtered_eog )
73
- threshold = mean_signal + (2 * stdev_signal ) # Threshold for detecting blinks
71
+ threshold = mean_signal + (1.7 * stdev_signal )
74
72
75
- # Define the window size (1 second of data)
76
73
window_size = self .sampling_rate
77
74
start_index = self .current_index - window_size
78
75
if start_index < 0 :
79
76
start_index = 0
80
77
81
- # Extract the window of data for peak detection
82
78
filtered_window = filtered_eog [start_index :self .current_index ]
83
79
peaks = self .detect_peaks (filtered_window , threshold )
84
80
85
- if peaks and not self .blink_detected :
86
- # Blink detected, trigger actions
87
- print ("Blink Detected!" )
88
- self .update_button_color ()
89
- self .keystroke_action ()
90
- self .blink_detected = True
91
- elif not peaks :
92
- # Reset blink state when no peaks are detected
93
- self .blink_detected = False
81
+ current_time = time .time ()
82
+ if peaks and (current_time - self .last_blink_time > self .refractory_period ):
83
+ self .last_blink_time = current_time
84
+ print (f"Blink detected at index { peaks [0 ]} . Time: { current_time } " )
85
+ self .trigger_action ()
94
86
95
87
def detect_peaks (self , signal , threshold ):
96
- # Detect peaks in the signal that exceed the threshold
97
88
peaks = []
98
89
for i in range (1 , len (signal ) - 1 ):
99
90
if signal [i ] > signal [i - 1 ] and signal [i ] > signal [i + 1 ] and signal [i ] > threshold :
100
- peaks .append (i )
91
+ if not peaks or (i - peaks [- 1 ] > self .sampling_rate * self .refractory_period ):
92
+ peaks .append (i )
101
93
return peaks
102
94
95
+ def trigger_action (self ):
96
+ if not self .blink_detected :
97
+ self .blink_detected = True
98
+ print ("Triggering action..." )
99
+ self .keystroke_action () # Press spacebar
100
+ self .update_button_color ()
101
+ threading .Timer (self .refractory_period , self .reset_blink_detected ).start ()
102
+
103
+ def reset_blink_detected (self ):
104
+ self .blink_detected = False
105
+
103
106
def update_button_color (self ):
104
- # Temporarily change the button color to indicate a blink
105
- self .blink_button .config (bg = "green" )
107
+ self .blink_button .config (bg = "#ADD8E6" )
106
108
self .blink_button .update ()
107
- time .sleep (0.1 )
108
- self .blink_button .config (bg = "SystemButtonFace" )
109
+ self .blink_button .after (100 , lambda : self .blink_button .config (bg = "SystemButtonFace" ))
109
110
110
111
def quit_action (detector ):
111
- # Action for the Quit button
112
112
print ("Quit button pressed. Exiting program." )
113
- detector .quit_detection ()
114
- exit ()
113
+ detector .stop_threads = True
114
+ detector .stop_detection ()
115
+ popup .quit ()
116
+ popup .destroy ()
115
117
116
118
def keystroke_action ():
117
- # Action to simulate a spacebar key press
118
- print ("Spacebar pressed." )
119
+ print ("Spacebar pressed!" )
119
120
pyautogui .press ('space' )
120
121
121
- def start_action (detector ):
122
- # Action for the Start button
123
- print ("Start button pressed. Starting the program." )
124
- detection_thread = threading .Thread (target = detector .start_detection , daemon = True )
125
- detection_thread .start ()
126
-
127
- def stop_action (detector ):
128
- # Action for the Start button
129
- print ("Stop button pressed. Stopping the program." )
130
- detection_thread = threading .Thread (target = detector .stop_detection , daemon = True )
131
- detection_thread .stop ()
132
-
133
- def toggle_action (detector , start_button ):
134
- # Toggle between Start and Stop
135
- if start_button .cget ("text" ) == "Start" :
136
- print ("Start button pressed. Starting the program." )
137
- start_button .config (text = "Stop" )
138
- detector .running = True
122
+ def connect_start_stop_action (detector , connect_button ):
123
+ if not detector .connected :
124
+ print ("Connect button pressed. Starting connection in a new thread." )
125
+ threading .Thread (target = connect_to_stream , args = (detector , connect_button ), daemon = True ).start ()
126
+ elif not detector .running :
127
+ print ("Start button pressed. Starting blink detection." )
128
+ connect_button .config (text = "Stop" , bg = "#FF6961" )
139
129
detection_thread = threading .Thread (target = detector .start_detection , daemon = True )
140
130
detection_thread .start ()
141
131
else :
142
- print ("Stop button pressed. Stopping the program ." )
143
- start_button .config (text = "Start" )
132
+ print ("Stop button pressed. Stopping blink detection ." )
133
+ connect_button .config (text = "Start" , bg = "#90EE90 " )
144
134
detector .stop_detection ()
145
135
136
+ def connect_to_stream (detector , connect_button ):
137
+ if detector .initialize_stream ():
138
+ connect_button .config (text = "Start" , bg = "#90EE90" )
139
+ else :
140
+ print ("Failed to connect to LSL stream." )
141
+
146
142
def create_popup ():
147
- # Create the main GUI window
143
+ global popup
148
144
popup = tk .Tk ()
149
- popup .geometry ("350x80" ) # Set the window size
150
- popup .overrideredirect (1 ) # Remove title bar
151
- popup .attributes ("-topmost" , True ) # Keep the window on top
145
+ popup .geometry ("300x120" )
146
+ popup .overrideredirect (1 )
147
+ popup .resizable (False , False )
148
+ popup .attributes ("-topmost" , True )
149
+
150
+ title_bar = tk .Frame (popup , bg = "#6082B6" , relief = "raised" , bd = 0 , height = 20 )
151
+ title_bar .pack (fill = tk .X , side = tk .TOP )
152
+
153
+ title_label = tk .Label (title_bar , text = " EOG Keystroke Emulator" , bg = "#6082B6" , fg = "white" , font = ("Arial" , 10 ))
154
+ title_label .pack (side = tk .LEFT , padx = 5 )
155
+
156
+ def start_move (event ):
157
+ popup .x = event .x
158
+ popup .y = event .y
159
+
160
+ def move (event ):
161
+ x = popup .winfo_pointerx () - popup .x
162
+ y = popup .winfo_pointery () - popup .y
163
+ popup .geometry (f"+{ x } +{ y } " )
164
+
165
+ title_bar .bind ("<Button-1>" , start_move )
166
+ title_bar .bind ("<B1-Motion>" , move )
167
+ title_label .bind ("<Button-1>" , start_move )
168
+ title_label .bind ("<B1-Motion>" , move )
152
169
153
- # Create a single horizontal frame for buttons
154
170
horizontal_frame = tk .Frame (popup )
155
171
horizontal_frame .pack (expand = True , pady = 10 )
156
172
157
- # Add Blink Detected button
158
- blink_button = tk .Button (horizontal_frame , text = "Blink Detected" , width = 12 )
173
+ eye_icon = PhotoImage (file = "C:\\ Users\\ PAYAL\\ Desktop\\ Chords-Python\\ applications\\ media\\ icons8-eye-30.png" )
174
+
175
+ blink_button = tk .Button (horizontal_frame , image = eye_icon , width = 70 , height = 38 , bg = "#FFFFFF" )
176
+ blink_button .image = eye_icon
159
177
blink_button .pack (side = tk .LEFT , padx = 10 )
160
178
161
- # Add Start/Stop button
162
- start_button = tk .Button (horizontal_frame , text = "Start" , width = 8 )
163
- start_button .pack (side = tk .LEFT , padx = 10 )
179
+ connect_button = tk .Button (horizontal_frame , text = "Connect" , width = 7 , bg = "#CBC3E3" )
180
+ connect_button .pack (side = tk .LEFT , padx = 10 )
164
181
165
- # Add Quit button
166
- quit_button = tk .Button (horizontal_frame , text = "Quit" , width = 8 )
182
+ quit_button = tk .Button (horizontal_frame , text = "Quit" , width = 7 , bg = "#71797E" , fg = "#FFFFFF" )
167
183
quit_button .pack (side = tk .LEFT , padx = 10 )
168
184
169
- # Initialize the EOG peak detector
170
- detector = EOGPeakDetector (blink_button , keystroke_action )
185
+ detector = EOGPeakDetector (blink_button , keystroke_action , connect_button )
171
186
172
- # Link Start/Stop button and Quit button to their respective actions
173
- start_button .config (command = lambda : toggle_action (detector , start_button ))
187
+ connect_button .config (command = lambda : connect_start_stop_action (detector , connect_button ))
174
188
quit_button .config (command = lambda : quit_action (detector ))
175
189
176
190
popup .mainloop ()
177
191
178
- # Create the pop-up window and run the application
179
192
create_popup ()
0 commit comments