Skip to content

Commit 9e4d084

Browse files
committed
refactor(tests): streamline output handling in DagsterTestContext
- Moved the `stream_output` function into a private method `_stream_output` for better encapsulation and clarity. - Enhanced `_run_command` method to utilize the new `_stream_output` method for real-time output streaming from subprocesses. - Added logic to ensure the working directory is restored after command execution, improving reliability. - Cleaned up print statements for better context during command execution.
1 parent 4bd5ba0 commit 9e4d084

File tree

1 file changed

+105
-87
lines changed

1 file changed

+105
-87
lines changed

dagster_sqlmesh/conftest.py

Lines changed: 105 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import datetime as dt
2+
import io
23
import json
34
import logging
45
import os
6+
import queue
57
import shutil
68
import subprocess
79
import sys
810
import tempfile
11+
import threading
912
import typing as t
1013
from dataclasses import dataclass, field
1114

@@ -541,109 +544,124 @@ class DagsterTestContext:
541544
dagster_project_path: str
542545
sqlmesh_project_path: str
543546

547+
def _stream_output(
548+
self,
549+
pipe: t.IO[str],
550+
output_queue: queue.Queue[tuple[str, str | None]],
551+
process_stdout: t.IO[str],
552+
) -> None:
553+
"""Stream output from a pipe to a queue.
554+
555+
Args:
556+
pipe: The pipe to read from (stdout or stderr)
557+
output_queue: Queue to write output to, as (stream_type, line) tuples
558+
process_stdout: The stdout pipe from the process, used to determine stream type
559+
"""
560+
# Use a StringIO buffer to accumulate characters into lines
561+
buffer = io.StringIO()
562+
stream_type = "stdout" if pipe is process_stdout else "stderr"
563+
564+
try:
565+
while True:
566+
char = pipe.read(1)
567+
if not char:
568+
# Flush any remaining content in buffer
569+
remaining = buffer.getvalue()
570+
if remaining:
571+
output_queue.put((stream_type, remaining))
572+
break
573+
574+
buffer.write(char)
575+
576+
# If we hit a newline, flush the buffer
577+
if char == "\n":
578+
output_queue.put((stream_type, buffer.getvalue()))
579+
buffer = io.StringIO()
580+
finally:
581+
buffer.close()
582+
output_queue.put((stream_type, None)) # Signal EOF
583+
544584
def _run_command(self, cmd: list[str], cwd: str | None = None) -> None:
545585
"""Execute a command and stream its output in real-time.
546586
547587
Args:
548588
cmd: List of command parts to execute
589+
cwd: Optional directory to change to before running the command.
549590
550591
Raises:
551592
subprocess.CalledProcessError: If the command returns non-zero exit code
552593
"""
553-
import io
554-
import queue
555-
import threading
556-
import typing as t
557-
558-
def stream_output(
559-
pipe: t.IO[str], output_queue: queue.Queue[tuple[str, str | None]]
560-
) -> None:
561-
"""Stream output from a pipe to a queue.
562-
563-
Args:
564-
pipe: The pipe to read from (stdout or stderr)
565-
output_queue: Queue to write output to, as (stream_type, line) tuples
566-
"""
567-
# Use a StringIO buffer to accumulate characters into lines
568-
buffer = io.StringIO()
569-
stream_type = "stdout" if pipe is process.stdout else "stderr"
570-
571-
try:
572-
while True:
573-
char = pipe.read(1)
574-
if not char:
575-
# Flush any remaining content in buffer
576-
remaining = buffer.getvalue()
577-
if remaining:
578-
output_queue.put((stream_type, remaining))
579-
break
580-
581-
buffer.write(char)
582-
583-
# If we hit a newline, flush the buffer
584-
if char == "\n":
585-
output_queue.put((stream_type, buffer.getvalue()))
586-
buffer = io.StringIO()
587-
finally:
588-
buffer.close()
589-
output_queue.put((stream_type, None)) # Signal EOF
594+
original_cwd = os.getcwd()
590595

591596
print(f"Running command: {' '.join(cmd)}")
592-
print(f"Current working directory: {os.getcwd()}")
593-
594-
if cwd:
595-
print(f"Changing to directory: {cwd}")
596-
os.chdir(cwd)
597-
598-
process = subprocess.Popen(
599-
cmd,
600-
stdout=subprocess.PIPE,
601-
stderr=subprocess.PIPE,
602-
text=True,
603-
universal_newlines=True,
604-
encoding="utf-8",
605-
errors="replace",
606-
)
597+
print(f"Original working directory: {original_cwd}")
607598

608-
if not process.stdout or not process.stderr:
609-
raise RuntimeError("Failed to open subprocess pipes")
599+
process = None
600+
try:
601+
if cwd:
602+
print(f"Changing to directory: {cwd}")
603+
os.chdir(cwd)
604+
else:
605+
print(f"Running in current directory: {original_cwd}")
606+
607+
process = subprocess.Popen(
608+
cmd,
609+
stdout=subprocess.PIPE,
610+
stderr=subprocess.PIPE,
611+
text=True,
612+
universal_newlines=True,
613+
encoding="utf-8",
614+
errors="replace",
615+
)
610616

611-
# Create a single queue for all output
612-
output_queue: queue.Queue[tuple[str, str | None]] = queue.Queue()
617+
if not process.stdout or not process.stderr:
618+
raise RuntimeError("Failed to open subprocess pipes")
613619

614-
# Start threads to read from pipes
615-
stdout_thread = threading.Thread(
616-
target=stream_output, args=(process.stdout, output_queue)
617-
)
618-
stderr_thread = threading.Thread(
619-
target=stream_output, args=(process.stderr, output_queue)
620-
)
620+
# Create a single queue for all output
621+
output_queue: queue.Queue[tuple[str, str | None]] = queue.Queue()
621622

622-
stdout_thread.daemon = True
623-
stderr_thread.daemon = True
624-
stdout_thread.start()
625-
stderr_thread.start()
626-
627-
# Track which streams are still active
628-
active_streams = {"stdout", "stderr"}
629-
630-
# Read from queue and print output
631-
while active_streams:
632-
try:
633-
stream_type, content = output_queue.get(timeout=0.1)
634-
if content is None:
635-
active_streams.remove(stream_type)
636-
else:
637-
print(content, end="", flush=True)
638-
except queue.Empty:
639-
continue
640-
641-
stdout_thread.join()
642-
stderr_thread.join()
643-
process.wait()
623+
# Start threads to read from pipes
624+
stdout_thread = threading.Thread(
625+
target=self._stream_output,
626+
args=(process.stdout, output_queue, process.stdout),
627+
)
628+
stderr_thread = threading.Thread(
629+
target=self._stream_output,
630+
args=(process.stderr, output_queue, process.stdout),
631+
)
644632

645-
if process.returncode != 0:
646-
raise subprocess.CalledProcessError(process.returncode, cmd)
633+
stdout_thread.daemon = True
634+
stderr_thread.daemon = True
635+
stdout_thread.start()
636+
stderr_thread.start()
637+
638+
# Track which streams are still active
639+
active_streams = {"stdout", "stderr"}
640+
641+
# Read from queue and print output
642+
while active_streams:
643+
try:
644+
stream_type, content = output_queue.get(timeout=0.1)
645+
if content is None:
646+
active_streams.remove(stream_type)
647+
else:
648+
print(content, end="", flush=True)
649+
except queue.Empty:
650+
continue
651+
652+
stdout_thread.join()
653+
stderr_thread.join()
654+
process.wait()
655+
656+
if process.returncode != 0:
657+
raise subprocess.CalledProcessError(process.returncode, cmd)
658+
finally:
659+
# Ensure we change back to the original directory
660+
if os.getcwd() != original_cwd:
661+
print(f"Changing back to original directory: {original_cwd}")
662+
os.chdir(original_cwd)
663+
else:
664+
print(f"Remained in original directory: {original_cwd}")
647665

648666
def asset_materialisation(
649667
self,

0 commit comments

Comments
 (0)