Skip to content

Commit f78b937

Browse files
committed
address comment
1 parent 74f797b commit f78b937

File tree

2 files changed

+58
-34
lines changed

2 files changed

+58
-34
lines changed

src/aks-agent/HISTORY.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Pending
1313
+++++++
1414

1515
1.0.0b14
16-
* Fix: chunk the data for output to fix the eror on MacOS "BlockingIOError: [Errno 35] write could not complete without blocking"
16+
* Fix: chunk the data for output to fix the error on macos "BlockingIOError: [Errno 35] write could not complete without blocking"
1717
* Fix: gracefully handle the connection reset error
1818

1919
1.0.0b13

src/aks-agent/azext_aks_agent/agent/k8s/pod_exec.py

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333

3434
logger = get_logger(__name__)
3535

36+
# WebSocket buffer size - matches Kubernetes client-go implementation
37+
# Reference: https://github.com/kubernetes/client-go/blob/master/transport/websocket/roundtripper.go#L67
38+
WEBSOCKET_BUFFER_SIZE = 32 * 1024 # 32 KiB
39+
3640

3741
def _get_terminal_size() -> Tuple[int, int]:
3842
"""
@@ -165,23 +169,33 @@ def _set_terminal_raw_mode():
165169
return fd, old_settings
166170

167171

168-
def _restore_terminal_mode(fd_or_handle, old_settings):
172+
def _restore_terminal_mode(fd_or_handle, old_settings, windows_console_state=None):
169173
"""
170174
Restore terminal to original mode.
171175
172176
Args:
173177
fd_or_handle: File descriptor (Unix) or handle (Windows)
174178
old_settings: Original terminal settings to restore
179+
windows_console_state: Windows console state tuple (output_cp, input_cp, stdout_mode, stdout_handle)
175180
"""
176181
try:
177182
if IS_WINDOWS:
178183
import ctypes
179184
kernel32 = ctypes.windll.kernel32
185+
186+
# Restore terminal raw mode
180187
kernel32.SetConsoleMode(fd_or_handle, old_settings)
188+
189+
# Restore Windows console settings (code pages and VT100 mode)
190+
if windows_console_state is not None:
191+
output_cp, input_cp, stdout_mode, stdout_handle = windows_console_state
192+
kernel32.SetConsoleOutputCP(output_cp)
193+
kernel32.SetConsoleCP(input_cp)
194+
kernel32.SetConsoleMode(stdout_handle, stdout_mode)
181195
else:
182196
termios.tcsetattr(fd_or_handle, termios.TCSADRAIN, old_settings)
183-
except (NameError, OSError, IOError):
184-
pass
197+
except (NameError, OSError, IOError) as e:
198+
logger.debug("Failed to restore terminal mode: %s", e)
185199

186200

187201
def _is_blocking_error(error):
@@ -196,7 +210,7 @@ def _is_blocking_error(error):
196210
"""
197211
import errno
198212
err_code = getattr(error, 'errno', None) or getattr(error, 'winerror', None)
199-
return err_code in (errno.EAGAIN, errno.EWOULDBLOCK, 35) if err_code else False
213+
return err_code in (errno.EAGAIN, errno.EWOULDBLOCK) if err_code else False
200214

201215

202216
def _is_connection_reset_error(error):
@@ -212,9 +226,9 @@ def _is_connection_reset_error(error):
212226
"""
213227
import errno
214228
err_code = getattr(error, 'errno', None) or getattr(error, 'winerror', None)
215-
# Unix: errno 104 (ECONNRESET), errno 32 (EPIPE)
229+
# Unix: ECONNRESET, EPIPE
216230
# Windows: WinError 10054 (WSAECONNRESET)
217-
return err_code in (errno.ECONNRESET, errno.EPIPE, 104, 32, 10054) if err_code else False
231+
return err_code in (errno.ECONNRESET, errno.EPIPE, 10054) if err_code else False
218232

219233

220234
def _heartbeat_worker(exec_stream, stop_event):
@@ -278,10 +292,17 @@ def exec_command_in_pod(pod_name: str, command: List[str], # pylint: disable=to
278292
original_sigwinch = None
279293
fd = None
280294
fl = None
295+
cleanup_done = False
296+
windows_console_state = None
281297

282298
def cleanup():
283299
"""Cleanup function to ensure proper resource cleanup."""
284-
nonlocal terminal_state, original_sigwinch, fd, fl, resp
300+
nonlocal terminal_state, original_sigwinch, fd, fl, resp, cleanup_done, windows_console_state
301+
302+
# Prevent duplicate cleanup
303+
if cleanup_done:
304+
return
305+
cleanup_done = True
285306

286307
# Restore signal handler
287308
if original_sigwinch and not IS_WINDOWS:
@@ -290,9 +311,9 @@ def cleanup():
290311
except (ValueError, OSError):
291312
pass
292313

293-
# Restore terminal mode
314+
# Restore terminal mode and Windows console settings
294315
if terminal_state is not None:
295-
_restore_terminal_mode(terminal_state[0], terminal_state[1])
316+
_restore_terminal_mode(terminal_state[0], terminal_state[1], windows_console_state)
296317

297318
# Restore stdin to blocking mode
298319
if not IS_WINDOWS and fd is not None and fl is not None:
@@ -309,10 +330,10 @@ def cleanup():
309330
logger.debug("Error closing WebSocket connection: %s", e)
310331

311332
# Register cleanup for SIGTERM
312-
def signal_handler(signum, frame):
333+
def signal_handler(signum, _frame):
313334
logger.info("Received signal %d, cleaning up...", signum)
314-
cleanup()
315-
sys.exit(0)
335+
# Raise SystemExit to trigger finally block and normal cleanup
336+
raise SystemExit(0)
316337

317338
original_sigterm = None
318339
if hasattr(signal, 'SIGTERM'):
@@ -359,19 +380,24 @@ def signal_handler(signum, frame):
359380
if IS_WINDOWS:
360381
import ctypes
361382
kernel32 = ctypes.windll.kernel32
383+
384+
# Save original console settings
385+
original_output_cp = kernel32.GetConsoleOutputCP()
386+
original_input_cp = kernel32.GetConsoleCP()
387+
STD_OUTPUT_HANDLE = -11
388+
stdout_handle = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
389+
original_mode = ctypes.c_uint32()
390+
kernel32.GetConsoleMode(stdout_handle, ctypes.byref(original_mode))
391+
windows_console_state = (original_output_cp, original_input_cp,
392+
original_mode.value, stdout_handle)
393+
362394
# Set console output code page to UTF-8 (65001)
363395
kernel32.SetConsoleOutputCP(65001)
364396
# Set console input code page to UTF-8
365397
kernel32.SetConsoleCP(65001)
366398
# Enable VT100 processing for ANSI escape sequences
367-
# Get stdout handle
368-
STD_OUTPUT_HANDLE = -11
369-
stdout_handle = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
370-
mode = ctypes.c_uint32()
371-
kernel32.GetConsoleMode(stdout_handle, ctypes.byref(mode))
372-
# Enable ENABLE_VIRTUAL_TERMINAL_PROCESSING (0x0004)
373399
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
374-
kernel32.SetConsoleMode(stdout_handle, mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
400+
kernel32.SetConsoleMode(stdout_handle, original_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
375401

376402
# Put terminal in raw mode to disable local echo
377403
if tty:
@@ -426,16 +452,13 @@ def resize_handler(signum, frame):
426452
# Handle stdout
427453
if resp.peek_stdout():
428454
stdout_data = resp.read_stdout()
429-
# Use the same buffer size as Kubernetes client-go (32 Kib)
430-
# Ref: https://github.com/kubernetes/client-go/blob/master/transport/websocket/roundtripper.go#L67
431-
# "BlockingIOError: [Errno 35] write could not complete without blocking", which is caused by wrting
432-
# to full pipe buffer and is easily reproducible on macOS, whose default output buffer size is 64 Kib
433-
# whose default output buffer size is 64 Kib. but using 32Kib here for consistency
434-
_BUF = 32 * 1024
435455
data = stdout_data.encode()
436456

437-
for start in range(0, len(data), _BUF):
438-
chunk = data[start: start + _BUF]
457+
# Write in chunks to avoid blocking on full pipe buffer
458+
# This prevents "BlockingIOError: [Errno 35] write could not complete without blocking"
459+
# which is easily reproducible on macOS (default pipe buffer: 64 KiB)
460+
for start in range(0, len(data), WEBSOCKET_BUFFER_SIZE):
461+
chunk = data[start: start + WEBSOCKET_BUFFER_SIZE]
439462

440463
while True:
441464
try:
@@ -444,7 +467,7 @@ def resize_handler(signum, frame):
444467
except BlockingIOError as exc:
445468
if exc.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
446469
raise # unexpected error
447-
time.sleep(0) # yield let LM Studio drain pipe
470+
time.sleep(0) # yield to let the system drain the pipe buffer
448471

449472
# Handle stderr
450473
if resp.peek_stderr():
@@ -462,8 +485,6 @@ def resize_handler(signum, frame):
462485
if data:
463486
try:
464487
resp.write_stdin(data)
465-
except BlockingIOError as e:
466-
logger.debug("stdin write blocked: %s", e)
467488
except OSError as e:
468489
if _is_blocking_error(e):
469490
logger.debug("stdin write blocked: %s", e)
@@ -475,12 +496,15 @@ def resize_handler(signum, frame):
475496
else:
476497
# Unix/Linux/macOS: Use select for non-blocking input
477498
if select.select([sys.stdin], [], [], 0)[0]:
478-
data = sys.stdin.read()
499+
# Read up to 1024 bytes to avoid blocking
500+
# Even with O_NONBLOCK set, stdin.read() without args can block
501+
try:
502+
data = os.read(sys.stdin.fileno(), 1024).decode('utf-8', errors='replace')
503+
except BlockingIOError:
504+
data = None
479505
if data:
480506
try:
481507
resp.write_stdin(data)
482-
except BlockingIOError as e:
483-
logger.debug("stdin write blocked: %s", e)
484508
except OSError as e:
485509
if _is_blocking_error(e):
486510
logger.debug("stdin write blocked: %s", e)

0 commit comments

Comments
 (0)