Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added app/__init__.py
Empty file.
13 changes: 13 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

# Read environment variables.
QUEUE_NAME = os.getenv("REDIS_QUEUE_NAME", "task_queue")
MINIO_URL = os.getenv("MINIO_URL", "localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
COCKROACHDB_URL = os.getenv(
"COCKROACHDB_URL", "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
)
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MESSAGE_RETRY_DELAY = 10
LOG_DATA_FIELD = b"log_data"
43 changes: 43 additions & 0 deletions app/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import psycopg2
from app.config import COCKROACHDB_URL

def update_run_status(run_id, status):
"""Updates the status of a run in the database."""
conn = None
try:
conn = psycopg2.connect(COCKROACHDB_URL)
cur = conn.cursor()
print(f"Updating run {run_id} status to '{status}'.")
cur.execute("UPDATE run SET status = %s WHERE id = %s", (status, run_id))
conn.commit()
except psycopg2.Error as e:
print(f"Error updating run status in CockroachDB: {e}")
finally:
if conn:
if "cur" in locals() and cur:
cur.close()
conn.close()

def get_run_type(run_id):
"""Fetches the type of the run from the database."""
conn = None
run_type = "scoop" # Default
try:
conn = psycopg2.connect(COCKROACHDB_URL)
cur = conn.cursor()
cur.execute("SELECT type FROM run WHERE id = %s", (run_id,))
result = cur.fetchone()
if result:
run_type = result[0]
print(f"Run type: {run_type}")
else:
print(f"Warning: Could not fetch run type for {run_id}")
except psycopg2.Error as e:
print(f"Error fetching run type in CockroachDB: {e}")
raise e # Re-raise to handle upstream
finally:
if conn:
if "cur" in locals() and cur:
cur.close()
conn.close()
return run_type
95 changes: 95 additions & 0 deletions app/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import threading
import subprocess
from app.config import REDIS_URL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused import

from app.streaming import stream_reader, redis_stream_adder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused import


def run_subprocess(run_id, local_file_path, file_parent_dir, run_type, log_queue, redis_adder_thread):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log the run_id as its not being used and just for the sake of using it?

"""Executes the code in a subprocess and manages streams."""
process = None
stdout_thread = None
stderr_thread = None
run_status = "failed"

command = []
if run_type == "ml":
command = ["python", local_file_path]
else:
command = ["python", "-m", "scoop", local_file_path]
timeout_sec = 3600
print(f"Running command: {' '.join(command)} in {file_parent_dir}")
print(f"Timeout: {timeout_sec} seconds")

try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=file_parent_dir,
)

# Start reader threads if Redis is enabled.
if redis_adder_thread:
stdout_thread = threading.Thread(
target=stream_reader,
args=(process.stdout, "stdout", log_queue),
daemon=True,
)
stderr_thread = threading.Thread(
target=stream_reader,
args=(process.stderr, "stderr", log_queue),
daemon=True,
)
stdout_thread.start()
stderr_thread.start()
else:
process.stdout.read()
process.stderr.read()
print("Redis not configured. Subprocess output will not be streamed.")

# Wait for Process Completion.
print(f"Waiting for subprocess (PID: {process.pid}) to complete...")
return_code = process.wait(timeout=timeout_sec)
print(f"Subprocess finished with return code: {return_code}")
if return_code == 0:
if any(
file.endswith(".txt")
for file in os.listdir(file_parent_dir)
if os.path.isfile(os.path.join(file_parent_dir, file))
):
run_status = "completed"
else:
print(
"Warning: Process exited with code 0 but expected output files not found."
)
run_status = "completed"
else:
run_status = "failed"
except subprocess.TimeoutExpired:
print(f"Subprocess timed out after {timeout_sec} seconds. Terminating...")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
run_status = "timed_out"
print("Subprocess terminated due to timeout.")
except Exception as e:
print(f"Error waiting for subprocess: {e}")
run_status = "failed"
if process and process.poll() is None:
process.kill()
process.wait()

# Wait for Log Streaming to Finish.
if stdout_thread:
stdout_thread.join(timeout=10)
if stdout_thread.is_alive():
print("Warning: stdout reader thread did not finish.")
if stderr_thread:
stderr_thread.join(timeout=10)
if stderr_thread.is_alive():
print("Warning: stderr reader thread did not finish.")

return run_status, process
50 changes: 50 additions & 0 deletions app/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
from minio import Minio
from minio.error import S3Error
from app.config import MINIO_URL, MINIO_ACCESS_KEY, MINIO_SECRET_KEY

def download_file(run_id, file_name, extension):
"""Downloads a file from MinIO storage."""
BUCKET_NAME = "code"
try:
minio_client = Minio(
MINIO_URL,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False,
)
object_name = f"{run_id}/{file_name}.{extension}"
download_path = f"code/{run_id}/{file_name}.{extension}"
os.makedirs(os.path.dirname(download_path), exist_ok=True)
minio_client.fget_object(BUCKET_NAME, object_name, download_path)
print(f"Successfully downloaded {object_name} to {download_path}")
return os.path.abspath(download_path)
except S3Error as exc:
print(f"Failed to download {object_name}: {exc}")
raise exc
except Exception as e:
print(f"An unexpected error occurred during download: {e}")
raise e


def upload_file(run_id, file_path):
"""Uploads a file to MinIO storage."""
BUCKET_NAME = "code"
try:
minio_client = Minio(
MINIO_URL,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False,
)
# Extract filename from the full path.
file_name_with_ext = os.path.basename(file_path)
object_name = f"{run_id}/{file_name_with_ext}"
minio_client.fput_object(BUCKET_NAME, object_name, file_path)
print(f"Successfully uploaded {file_path} as {object_name}")
except S3Error as exc:
print(f"Failed to upload {file_path}: {exc}")
raise exc
except Exception as e:
print(f"An unexpected error occurred during upload: {e}")
raise e
150 changes: 150 additions & 0 deletions app/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import json
import time
import queue
import redis
import threading
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

from app.config import LOG_DATA_FIELD

STREAM_END = object()

def stream_reader(stream, stream_name, log_queue):
"""Reads lines from a stream and puts them onto the queue."""
try:
for line in iter(stream.readline, b""):
try:
decoded_line = line.decode("utf-8").rstrip()
log_entry = json.dumps({"stream": stream_name, "line": decoded_line})
log_queue.put(log_entry)
except UnicodeDecodeError:
log_entry = json.dumps(
{"stream": stream_name, "line": repr(line)[2:-1]}
)
log_queue.put(log_entry)
except Exception as e:
print(f"Error processing log line from {stream_name}: {e}")
print(f"Stream reader for {stream_name} finished.")
except Exception as e:
print(f"Error in stream reader for {stream_name}: {e}")
finally:
log_queue.put(STREAM_END)
print(f"Stream reader for {stream_name} exiting (sent STREAM_END).")
if stream:
stream.close()


def redis_stream_adder(redis_url, log_queue, run_id):
"""Connects to Redis and adds logs from the queue to a run-specific Redis Stream."""
r = None
active_streams = 2 # (stdout, stderr)
connection_attempts = 0
max_attempts = 5
retry_delay = 2
stream_name = run_id # Use run_id as the stream name
srream_ttl_seconds = 120

# Redis Connection Loop.
while connection_attempts < max_attempts:
try:
print(f"Attempting to connect to Redis for Stream: {redis_url}")
r = redis.from_url(redis_url, decode_responses=False)
r.ping()
print(f"Redis Stream connection established to {redis_url}")
break
except redis.exceptions.ConnectionError as e:
connection_attempts += 1
print(
f"Redis Stream connection failed (Attempt {connection_attempts}/{max_attempts}): {e}"
)
if connection_attempts >= max_attempts:
print(
"Max connection attempts reached. Cannot add logs to Redis Stream."
)
# Drain queue.
while active_streams > 0:
item = log_queue.get()
if item is STREAM_END:
active_streams -= 1
log_queue.task_done()
return
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2
except Exception as e:
print(f"Unexpected error during Redis Stream connection: {e}")
# Drain queue.
while active_streams > 0:
item = log_queue.get()
if item is STREAM_END:
active_streams -= 1
log_queue.task_done()
return

# Log Adding Loop.
entries_added = 0
try:
while active_streams > 0:
try:
log_entry = log_queue.get(timeout=300)
except queue.Empty:
print(
"Log queue empty timeout reached. Checking Redis stream connection."
)
try:
if r:
r.ping()
else:
break
except redis.exceptions.ConnectionError:
print("Redis stream connection lost unexpectedly.")
break
continue

if log_entry is STREAM_END:
active_streams -= 1
print(
f"Received stream end signal. Active streams remaining: {active_streams}"
)
elif isinstance(log_entry, str):
try:
if r:
payload = {LOG_DATA_FIELD: log_entry}
entry_id = r.xadd(stream_name, payload)
r.expire(stream_name, srream_ttl_seconds)
entries_added += 1
else:
print("Cannot add log to stream, Redis is not connected.")
except redis.exceptions.RedisError as e:
print(f"Error using XADD for Redis Stream '{stream_name}': {e}")
except Exception as e:
print(f"Unexpected error during XADD: {e}")
break

log_queue.task_done()

# After both streams end
if r:
# Add an End-Of-File marker message to the stream.
eof_message = json.dumps({"status": "EOF", "runId": run_id})
eof_payload = {LOG_DATA_FIELD: eof_message}
try:
entry_id = r.xadd(stream_name, eof_payload)
r.expire(stream_name, srream_ttl_seconds)
print(
f"Added EOF marker to Redis Stream '{stream_name}', ID: {entry_id.decode()}"
)
entries_added += 1
except redis.exceptions.RedisError as e:
print(f"Error adding EOF marker to Redis Stream: {e}")

except Exception as e:
print(f"Error in Redis stream adder loop: {e}")
finally:
print(
f"Redis stream adder thread finishing. Total entries added: {entries_added}"
)
if r:
try:
r.close()
print("Redis stream connection closed.")
except Exception as e:
print(f"Error closing Redis stream connection: {e}")
10 changes: 10 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import json

def parse_json_string(json_string):
"""Parses a JSON string and returns a Python object."""
try:
data = json.loads(json_string)
return data
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return None
Loading