-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
254 lines (213 loc) · 9.18 KB
/
server.py
File metadata and controls
254 lines (213 loc) · 9.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import numpy as np
import socket
import json
import struct
import time
import threading
from typing import Tuple, Optional
class ArrayServer:
def __init__(self, host='0.0.0.0', port=9999):
self.host = host
self.port = port
self.server_socket = None
self.client_socket = None
self.is_running = False
self.server_thread = None
self._last_array = None
self._last_array_prev = None
self._last_position = None
def start(self):
"""Start the server in a separate thread"""
self.server_thread = threading.Thread(target=self._run_server)
self.is_running = True
self.server_thread.start()
def stop(self):
"""Stop the server"""
self.is_running = False
if self.client_socket:
self.client_socket.close()
if self.server_socket:
self.server_socket.close()
if self.server_thread:
self.server_thread.join()
def send_array(self, array):
"""Send a numpy array to the connected client"""
if not isinstance(array, np.ndarray):
array = np.array(array, dtype=np.float32)
if array.dtype != np.float32:
array = array.astype(np.float32)
self._last_array = array
def set_fingertip_callback(self, callback):
"""Set a callback function to be called when new fingertip position is received"""
self._fingertip_callback = callback
def _run_server(self):
"""Internal method to run the server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(1)
print(f"Server listening on {self.host}:{self.port}")
while self.is_running:
try:
self.client_socket, address = self.server_socket.accept()
print(f"Connection from {address}")
self._handle_client()
except Exception as e:
if self.is_running: # Only print error if we're still supposed to be running
print(f"Error in server: {e}")
finally:
if self.client_socket:
self.client_socket.close()
def _handle_client(self):
"""Handle communication with a connected client"""
receive_thread = threading.Thread(target=self._receive_fingertip_data)
receive_thread.start()
try:
while self.is_running:
if self._last_array is not None:
if self._last_array_prev is None or np.any(self._last_array != self._last_array_prev):
self._send_array_data(self._last_array)
self._last_array_prev = self._last_array
time.sleep(0.1) # 10 FPS
except Exception as e:
print(f"Error handling client: {e}")
finally:
receive_thread.join()
def get_fingertip_position(self, name="index"):
if self._last_position is None:
# return np.zeros(3)
return np.array([0,0,0.4])
try:
if name == "thumb":
fingertip = self._last_position[19] # thumb fingertip
elif name == "index":
fingertip = self._last_position[20] # index fingertip
elif name == "middle":
fingertip = self._last_position[21] # middle fingertip
elif name == "ring":
fingertip = self._last_position[22] # ring fingertip
elif name == "pinky":
fingertip = self._last_position[23] # pinky fingertip
except Exception as e:
return np.array([0,0,0.4])
return fingertip
def get_world_fingertip_position(self):
if self._last_position is None:
return np.array([0,0,0.4])
return self._last_position[-1]
def get_thumb_to_index_distance(self):
if self._last_position is None:
return 0
try:
index_joints = self._last_position[[2, 6, 7, 8, 20]]
thumbtip = self._last_position[19]
distances = np.linalg.norm(index_joints - thumbtip, axis=1)
except Exception as e:
return 0
return np.min(distances)
def get_finger_joints(self):
return self._last_position
def _receive_fingertip_data(self):
"""Receive 3D position data from client"""
while self.is_running:
try:
# Read exactly 4 bytes for the length prefix
length_prefix = self._recv_all(4)
if not length_prefix:
print("No length prefix received")
break
# Convert 4 bytes to integer (big-endian)
message_length = int.from_bytes(length_prefix, byteorder='big', signed=False)
# print(message_length)
if message_length <= 0 or message_length > 1024: # Sanity check
print(f"Invalid message length: {message_length}")
continue
# Receive the complete message
message_data = self._recv_all(message_length)
if not message_data:
print("No message data received")
continue
# Decode the message to a string
message_str = message_data.decode('utf-8').strip()
# Parse the string into a numpy array
try:
position_list = list(map(float, message_str.split(',')))
self._last_position = np.array(position_list).reshape(-1, 3)
# print(f"Successfully parsed positions: {self._last_position}")
if hasattr(self, '_position_callback') and self._position_callback:
self._position_callback(self._last_position)
except ValueError as e:
print(f"Value error: {e}")
print(f"Problematic message: {message_str}")
continue
except Exception as e:
if self.is_running:
print(f"Error receiving position data: {e}")
print(f"Error type: {type(e)}")
import traceback
traceback.print_exc()
continue # Continue instead of break to maintain connection
def _recv_all(self, n: int) -> Optional[bytes]:
"""Helper method to receive exactly n bytes"""
data = bytearray()
bytes_remaining = n
start_time = time.time()
while bytes_remaining > 0 and (time.time() - start_time) < 5: # 5 second timeout
try:
chunk = self.client_socket.recv(min(bytes_remaining, 4096))
if not chunk:
print(f"Connection closed by client. Received {len(data)} of {n} bytes")
return None
data.extend(chunk)
bytes_remaining = n - len(data)
except socket.error as e:
print(f"Socket error in _recv_all: {e}")
return None
if bytes_remaining > 0:
print(f"Timeout while receiving data. Got {len(data)} of {n} bytes")
return None
return bytes(data)
def _send_array_data(self, array):
"""Internal method to send array data"""
try:
# Get array shape and create header
shape = array.shape
header = {
'rows': shape[0],
'cols': shape[1],
'dtype': str(array.dtype)
}
# Convert header to JSON and get its length
header_json = json.dumps(header).encode('utf-8')
header_length = len(header_json)
# Send header length first (as 4 bytes)
self.client_socket.send(struct.pack('!I', header_length))
# Send header
self.client_socket.send(header_json)
# Send array data
self.client_socket.sendall(array.tobytes())
except Exception as e:
raise Exception(f"Error sending array: {e}")
# Example usage functions
def generate_wave_pattern(size=(41, 41), frame=0):
"""Generate a sample wave pattern"""
x = np.linspace(-4, 4, size[0])
y = np.linspace(-4, 4, size[1])
X, Y = np.meshgrid(x, y)
t = frame * 0.1
wave1 = np.sin(np.sqrt(X**2 + Y**2) + t)
wave2 = np.sin(X + t) * np.cos(Y + t)
combined = (wave1 + wave2) / 2
normalized = (combined - combined.min()) / (combined.max() - combined.min())
return normalized.astype(np.float32)
# def on_fingertip_update(position):
# print(f"Received fingertip position: x={position[0]:.2f}, y={position[1]:.2f}")
if __name__ == "__main__":
server = ArrayServer()
# server.set_fingertip_callback(on_fingertip_update)
server.start()
frame = 0
while True:
array = generate_wave_pattern(frame=frame)
server.send_array(array)
frame += 1
time.sleep(0.1) # 10 FPS