Skip to content

Commit a229a07

Browse files
authored
Merge pull request #2 from dacort/fix/big-files
Add streaming output
2 parents ef6ee33 + 905fc4f commit a229a07

File tree

3 files changed

+71
-29
lines changed

3 files changed

+71
-29
lines changed

src/pyssm_client/cli/main.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ def exec_command(
392392
endpoint_url: str | None,
393393
timeout: int,
394394
) -> None:
395-
"""Execute a single command and return stdout/stderr/exit code."""
395+
"""Execute a single command with real-time streaming output."""
396396
try:
397397
result = run_command_sync(
398398
target=target,
@@ -401,22 +401,10 @@ def exec_command(
401401
region=region,
402402
endpoint_url=endpoint_url,
403403
timeout=timeout,
404+
stream_output=True,
404405
)
405-
# Print streams; exit with command exit code
406-
if result.stdout:
407-
try:
408-
sys.stdout.buffer.write(result.stdout)
409-
sys.stdout.buffer.flush()
410-
except Exception:
411-
click.echo(result.stdout.decode("utf-8", errors="replace"), nl=False)
412-
if result.stderr:
413-
try:
414-
sys.stderr.buffer.write(result.stderr)
415-
sys.stderr.buffer.flush()
416-
except Exception:
417-
click.echo(
418-
result.stderr.decode("utf-8", errors="replace"), nl=False, err=True
419-
)
406+
# Output is automatically streamed during execution
407+
# Just exit with the command's exit code
420408
sys.exit(result.exit_code)
421409
except Exception as e:
422410
click.echo(f"Execution failed: {e}", err=True)

src/pyssm_client/file_transfer/client.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,10 @@ async def _create_ssm_session(
339339

340340
async def _setup_data_channel(self, session_data: dict) -> tuple[Any, Any]:
341341
"""Set up data channel for file transfer."""
342-
from ..communicator.data_channel import SessionDataChannel
343342
from ..cli.types import ConnectArguments
344-
from ..session.registry import get_session_registry
343+
from ..communicator.data_channel import SessionDataChannel
345344
from ..session.plugins import StandardStreamPlugin
345+
from ..session.registry import get_session_registry
346346
from ..session.session_handler import SessionHandler
347347

348348
# Create session object first
@@ -507,15 +507,6 @@ async def _upload_base64(
507507
# Small delay to avoid overwhelming remote
508508
await asyncio.sleep(0.005)
509509

510-
if options.progress_callback:
511-
try:
512-
import sys
513-
514-
sys.stdout.write("\n")
515-
sys.stdout.flush()
516-
except Exception:
517-
pass
518-
519510
self.logger.info("All chunks sent; waiting for remote flush...")
520511
remote_size = await self._wait_for_remote_size(
521512
temp_remote,

src/pyssm_client/utils/command.py

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ async def run_command(
7474
region: Optional[str] = None,
7575
endpoint_url: Optional[str] = None,
7676
timeout: int = 600,
77+
stream_output: bool = False,
7778
) -> CommandResult:
7879
"""Execute a single command on target and return stdout/stderr/exit_code.
7980
@@ -86,6 +87,7 @@ async def run_command(
8687
region: AWS region
8788
endpoint_url: Custom AWS endpoint URL
8889
timeout: Command timeout in seconds
90+
stream_output: Whether to stream filtered output to stdout/stderr in real-time
8991
9092
Returns:
9193
CommandResult with separated stdout, stderr, and exit code
@@ -146,9 +148,40 @@ async def run_command(
146148
session_done = asyncio.Event()
147149
exit_code = 0
148150

151+
# Line buffers for streaming
152+
stdout_line_buf = bytearray()
153+
stderr_line_buf = bytearray()
154+
149155
def handle_stdout(data: bytes) -> None:
150156
"""Handle stdout from shell."""
151-
nonlocal stdout_buf, exit_code
157+
nonlocal stdout_buf, exit_code, stdout_line_buf
158+
159+
# Add to line buffer for streaming
160+
stdout_line_buf.extend(data)
161+
162+
# Process complete lines for streaming
163+
while b"\n" in stdout_line_buf:
164+
line_end = stdout_line_buf.index(b"\n")
165+
line = stdout_line_buf[: line_end + 1]
166+
stdout_line_buf = stdout_line_buf[line_end + 1 :]
167+
168+
# Apply existing filter to the line and stream if requested
169+
if stream_output:
170+
filtered = _filter_shell_output(line, command)
171+
if filtered and filtered.strip():
172+
try:
173+
import sys
174+
175+
sys.stdout.buffer.write(filtered)
176+
sys.stdout.buffer.flush()
177+
except Exception:
178+
try:
179+
import sys
180+
181+
sys.stdout.write(filtered.decode("utf-8", errors="replace"))
182+
sys.stdout.flush()
183+
except Exception:
184+
pass
152185

153186
# Check for exit status marker in stdout
154187
try:
@@ -172,7 +205,35 @@ def handle_stdout(data: bytes) -> None:
172205

173206
def handle_stderr(data: bytes) -> None:
174207
"""Handle stderr from shell."""
175-
nonlocal stderr_buf
208+
nonlocal stderr_buf, stderr_line_buf
209+
210+
# Add to line buffer for streaming
211+
stderr_line_buf.extend(data)
212+
213+
# Process complete lines for streaming
214+
while b"\n" in stderr_line_buf:
215+
line_end = stderr_line_buf.index(b"\n")
216+
line = stderr_line_buf[: line_end + 1]
217+
stderr_line_buf = stderr_line_buf[line_end + 1 :]
218+
219+
# Apply existing filter to stderr line and stream if requested
220+
if stream_output:
221+
filtered = _filter_shell_output(line, command)
222+
if filtered and filtered.strip():
223+
try:
224+
import sys
225+
226+
sys.stderr.buffer.write(filtered)
227+
sys.stderr.buffer.flush()
228+
except Exception:
229+
try:
230+
import sys
231+
232+
sys.stderr.write(filtered.decode("utf-8", errors="replace"))
233+
sys.stderr.flush()
234+
except Exception:
235+
pass
236+
176237
stderr_buf.extend(data)
177238

178239
def handle_closed() -> None:
@@ -250,6 +311,7 @@ def run_command_sync(
250311
region: Optional[str] = None,
251312
endpoint_url: Optional[str] = None,
252313
timeout: int = 600,
314+
stream_output: bool = False,
253315
) -> CommandResult:
254316
"""Synchronous wrapper for run_command()."""
255317
return asyncio.run(
@@ -260,5 +322,6 @@ def run_command_sync(
260322
region=region,
261323
endpoint_url=endpoint_url,
262324
timeout=timeout,
325+
stream_output=stream_output,
263326
)
264327
)

0 commit comments

Comments
 (0)