diff --git a/app-backend/Dockerfile b/app-backend/Dockerfile index 9688a4b..eddbb4d 100644 --- a/app-backend/Dockerfile +++ b/app-backend/Dockerfile @@ -14,8 +14,9 @@ WORKDIR /home/user/ RUN git clone --depth 1 https://github.com/opea-project/GenAIComps.git WORKDIR /home/user/GenAIComps -RUN pip install --no-cache-dir --upgrade pip==24.3.1 setuptools==75.3.0 && \ - pip install --no-cache-dir -r /home/user/GenAIComps/requirements.txt +RUN pip install --no-cache-dir --upgrade pip==24.3.1 setuptools==78.1.1 && \ + pip install --no-cache-dir -r /home/user/GenAIComps/requirements.txt && \ + pip install --no-cache-dir --upgrade mcp==1.10.0 pillow==11.3.0 COPY ./templates/microservices/* /home/user/templates/microservices/ COPY ./megaservice.py /home/user/megaservice.py diff --git a/studio-backend/Dockerfile b/studio-backend/Dockerfile index 4884c5d..00b557d 100644 --- a/studio-backend/Dockerfile +++ b/studio-backend/Dockerfile @@ -13,7 +13,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin rm -rf /var/lib/apt/lists/* # Upgrade setuptools to a safe version and install any needed packages specified in requirements.txt -RUN pip install --no-cache-dir --upgrade pip==24.3.1 setuptools==75.3.0 && \ +RUN pip install --no-cache-dir --upgrade pip==24.3.1 setuptools==78.1.1 && \ pip install --no-cache-dir -r /usr/src/app/requirements.txt # Define environment variable diff --git a/studio-backend/app/routers/clickdeploy_router.py b/studio-backend/app/routers/clickdeploy_router.py index 24a7bea..7e33520 100644 --- a/studio-backend/app/routers/clickdeploy_router.py +++ b/studio-backend/app/routers/clickdeploy_router.py @@ -1,275 +1,79 @@ -from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect -from fastapi.concurrency import run_in_threadpool -import paramiko +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks import asyncio -import json from app.models.pipeline_model import DeployPipelineFlow -from app.services.clickdeploy_service import upload_pipeline_zip +from app.services.clickdeploy_service import async_click_deployment, async_check_deployment_status router = APIRouter() -@router.post("/upload-pipeline-files") -async def upload_pipeline_files(request: DeployPipelineFlow): - print("[DEBUG] Entered /upload-pipeline-files endpoint") +@router.post("/click-deployment") +async def click_deployment(request: DeployPipelineFlow, background_tasks: BackgroundTasks): + """Trigger click deployment asynchronously""" + print("[INFO] Entered /click-deployment endpoint") remote_host = request.remoteHost - remote_user = request.remoteUser + remote_user = request.remoteUser pipeline_flow = request.pipelineFlow - try: - print("[DEBUG] Calling upload_pipeline_zip...") - response = upload_pipeline_zip(remote_host, remote_user, pipeline_flow.dict()) - print("[DEBUG] upload_pipeline_zip returned") - except Exception as e: - print(f"[ERROR] Exception in /upload-pipeline-files: {e}") - raise HTTPException(status_code=500, detail=str(e)) - return response - -@router.websocket("/ws/deploy-and-monitor") -async def deploy_and_monitor_status(websocket: WebSocket): - print('deploying and monitoring status') + print(f"[DEBUG] pipeline_flow type: {type(pipeline_flow)}") + chatflow_id = pipeline_flow.id + print(f"[DEBUG] chatflow_id: {chatflow_id}") + + # Use the service layer async function directly with BackgroundTasks + background_tasks.add_task( + async_click_deployment, + remote_host, + remote_user, + pipeline_flow.dict(), + chatflow_id + ) + + return { + "status": "In Progress", + "message": "Deployment initiated...", + "chatflow_id": chatflow_id + } + +@router.websocket("/ws/monitor-click-deployment") +async def monitor_click_deployment_status(websocket: WebSocket): + """Monitor deployment status using docker ps commands""" + print('[INFO] Starting deployment status monitoring') await websocket.accept() - ssh_connection = None + try: data = await websocket.receive_json() - print("Received data: ", data) + print("Received monitoring data: ", data) remote_host = data["hostname"] remote_user = data["username"] - compose_dir = data["compose_dir"] - remote_zip_path = data.get("remote_zip_path", f"/home/{remote_user}/docker-compose.zip") + # chatflow_id = data.get("chatflow_id", "unknown") + compose_dir = "genaistudio-compose" # Standard directory name - # Step 1: Connect SSH - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(remote_host, username=remote_user) - ssh_connection = ssh # Store for cleanup - - # Step 2: Stop existing services if directory exists - await websocket.send_json({"status": "Preparing", "message": "Checking for existing services..."}) - print(f"[DEBUG] Checking if {compose_dir} directory exists...") - _, stdout, _ = ssh.exec_command(f"ls -d {compose_dir}", get_pty=True) - exit_status = stdout.channel.recv_exit_status() - if exit_status == 0: - _, stdout, _ = ssh.exec_command(f"cd {compose_dir} && docker compose down", get_pty=True) - while not stdout.channel.exit_status_ready(): - await websocket.send_json({"status": "Preparing", "message": "Stopping any existing services..."}) - await asyncio.sleep(2) # Non-blocking sleep - # Clean up old files - ssh.exec_command(f"cd {compose_dir}; rm -f .env nohup.out app.nginx.conf.template compose.yaml workflow-info.json", get_pty=True) - else: - pass - - # Step 3: Extract files - await websocket.send_json({"status": "Preparing", "message": "Extracting files..."}) - extract_cmd = f"python3 -c \"import zipfile; zipfile.ZipFile('{remote_zip_path}').extractall('{compose_dir}')\"" - _, stdout, _ = ssh.exec_command(extract_cmd, get_pty=True) - if stdout.channel.recv_exit_status() != 0: - await websocket.send_json({"status": "Error", "error": "Failed to extract files using Python."}) - ssh.close() - await websocket.close() - return - await websocket.send_json({"status": "Preparing", "message": "Extraction complete."}) - # Clean up the uploaded docker-compose.zip file - _, stdout, _ = ssh.exec_command(f"rm -f {remote_zip_path}", get_pty=True) - - # Step 4: Start services - _, stdout, _ = ssh.exec_command(f"cd {compose_dir} && nohup docker compose up -d & sleep 0.1", get_pty=True) - ssh.close() - await websocket.send_json({"status": "Preparing", "message": "Deployment steps complete. Monitoring status..."}) + # Monitor deployment status by checking Docker services directly + # Give some time for initial deployment setup + await asyncio.sleep(2) - def check_status(): - ssh_check = None - try: - ssh_check = paramiko.SSHClient() - ssh_check.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_check.connect(remote_host, username=remote_user) - - # Verify the directory exists and has content - print(f"[DEBUG] Checking directory: {compose_dir}") - _, stdout_ls, _ = ssh_check.exec_command(f"ls -la {compose_dir}") - # ls_output = stdout_ls.read().decode().strip() - # print(f"[DEBUG] Directory contents: {ls_output}") - - # Get the number of services defined in compose.yaml - _, stdout_num, _ = ssh_check.exec_command(f"cd {compose_dir} && docker compose config --services | wc -l") - num_services_output = stdout_num.read().decode().strip().splitlines() - num_services_lines = [line for line in num_services_output if not line.startswith('WARN') and line.strip()] - num_services_str = num_services_lines[-1] if num_services_lines else '0' - # print(f"[DEBUG] Number of services defined: {num_services_str}") - - # Run docker compose ps to get service status - _, stdout_ps, _ = ssh_check.exec_command(f"cd {compose_dir} && docker compose ps --all --format json") - out = stdout_ps.read().decode() - json_lines = [line for line in out.strip().splitlines() if line.strip() and not line.strip().startswith('WARN')] - out_filtered = '\n'.join(json_lines) - # print(f"[DEBUG] Docker compose ps output: {out_filtered}") - - # Read nohup.out for progress logs (always fetch latest 10 lines) - _, stdout_nohup, _ = ssh_check.exec_command(f"cd {compose_dir} && tail -n 10 nohup.out") - nohup_out_lines = stdout_nohup.read().decode().splitlines() - print(f"[DEBUG] Nohup output: {nohup_out_lines}") - - return _process_service_status(out_filtered, num_services_str, nohup_out_lines) - except Exception as e: - return {"error": f"Status check failed: {str(e)}"} - finally: - if ssh_check: - try: - ssh_check.close() - except: - pass - - def _process_service_status(out_filtered, num_services_str, nohup_out_lines): - try: - # If output contains multiple JSON objects, parse all and aggregate - json_lines = [line for line in out_filtered.strip().splitlines() if line.strip()] - all_services = [] - for line in json_lines: - try: - ps_data = json.loads(line) - if isinstance(ps_data, dict): - if 'services' in ps_data or 'containers' in ps_data: - services = ps_data.get('services') or ps_data.get('containers') or [] - if isinstance(services, dict): - services = list(services.values()) - elif not isinstance(services, list): - services = [] - all_services.extend(services) - else: - all_services.append(ps_data) - else: - all_services.append(ps_data) - except Exception as e: - return {"error": f"Failed to parse docker compose ps output: {line}\n{str(e)}"} - - if len(all_services) != int(num_services_str): - # If error in nohup.out, return as error - if any("error" in line.lower() or "fail" in line.lower() for line in nohup_out_lines): - return {"error": nohup_out_lines} - else: - print(f"[DEBUG] Docker pulling images..") - return { - "all_healthy": False, - "none_restarting": True, - "services_running": 0, - "services_exited": 0, - "services_defined": int(num_services_str), - "ps": all_services, - "error": None, - "nohup_out": nohup_out_lines - } - - # Define containers that are expected to complete and exit - expected_completed_containers = [ - "model-downloader", "downloader", "init", "setup", "migrate", "seed" - ] - - # Filter out expected completed containers from exited count - unexpected_exited_services = [] - expected_completed_services = [] - running_services = [] - - for s in all_services: - if isinstance(s, dict): - service_name = s.get("Name", "").lower() - state = s.get("State", "") - - if state == "running": - running_services.append(s) - elif state == "exited": - # Check if this is an expected completed container - is_expected_completed = any(keyword in service_name for keyword in expected_completed_containers) - if is_expected_completed: - expected_completed_services.append(s) - else: - unexpected_exited_services.append(s) - - services_exited = len(unexpected_exited_services) - services_running = len(running_services) - services_expected_completed = len(expected_completed_services) - - print(f"[DEBUG] Number of services deployed: {services_running} running, {services_exited} unexpectedly exited, {services_expected_completed} expected completed / {num_services_str} total") - all_healthy = all((not isinstance(s, dict)) or (s.get("Health", "") in ("", "healthy")) for s in all_services) - none_restarting = all(isinstance(s, dict) and s.get("State", "") != "restarting" for s in all_services) - - print(f"[DEBUG] all_healthy: {all_healthy}") - print(f"[DEBUG] none_restarting: {none_restarting}") - - return { - "all_healthy": all_healthy, - "none_restarting": none_restarting, - "services_running": services_running, - "services_exited": services_exited, - "services_expected_completed": services_expected_completed, - "services_defined": int(num_services_str), - "ps": all_services, - "error": None, - "nohup_out": nohup_out_lines - } - except Exception as e: - return {"error": str(e)} - while True: - result = await run_in_threadpool(check_status) - - if result["error"]: - await websocket.send_json({"status": "Error", "error": result["error"]}) + # Check Docker services directly using the service layer function + result = await async_check_deployment_status(remote_host, remote_user, compose_dir) + await websocket.send_json(result) + + # Continue monitoring until success or definitive error + if result["status"] == "Success": break - - if (int(result["services_running"]) + int(result["services_exited"]) + int(result.get("services_expected_completed", 0))) == result["services_defined"]: - # Only consider actually running services + expected completed services as "successful" - expected_running_services = result["services_defined"] - result.get("services_expected_completed", 0) - - if result["all_healthy"] and result["services_running"] == expected_running_services and result["services_exited"] == 0: - await asyncio.sleep(5) - recheck_result = await run_in_threadpool(check_status) - if recheck_result["none_restarting"]: - total_successful = result["services_running"] + result.get("services_expected_completed", 0) - await websocket.send_json({"status": "Done", "success": f"All {total_successful} services completed successfully ({result['services_running']} running, {result.get('services_expected_completed', 0)} completed). Open http://localhost:8090 in your machine's browser to access the application."}) - else: - restarting_services = [ - s.get("Name", "unknown") for s in recheck_result["ps"] - if isinstance(s, dict) and s.get("State", "") == "restarting" - ] - await websocket.send_json({"status": "Error", "error": f"Services stuck in restarting status: [{', '.join(restarting_services)}]"}) - else: - # Only report unexpected exited services as errors - if result["services_exited"] > 0: - exited_services = [ - s.get("Name", "unknown") for s in result["ps"] - if isinstance(s, dict) and s.get("State", "") == "exited" and not any(keyword in s.get("Name", "").lower() for keyword in ["model-downloader", "downloader", "init", "setup", "migrate", "seed"]) - ] - if exited_services: - await websocket.send_json({"status": "Error", "error": f"Services in exited state: [{', '.join(exited_services)}]"}) - else: - # All exited services are expected completed ones, continue monitoring - await websocket.send_json({"status": "In Progress", "ps": result["ps"], "nohup_out": result.get("nohup_out", [])}) - await asyncio.sleep(2) - continue - else: - await websocket.send_json({"status": "In Progress", "ps": result["ps"], "nohup_out": result.get("nohup_out", [])}) - await asyncio.sleep(2) - continue + elif result["status"] == "Error" and "Deployment directory not found" not in result["message"]: + # Only break on real errors, not setup/preparation states break - # Send nohup_out in progress status - await websocket.send_json({"status": "In Progress", "ps": result["ps"], "nohup_out": result.get("nohup_out", [])}) - await asyncio.sleep(2) + + await asyncio.sleep(2) # Check every 2 seconds to give services time to start + except WebSocketDisconnect: - print("Client disconnected from deploy-and-monitor WebSocket") + print("Client disconnected from monitor-click-deployment WebSocket") except Exception as e: - print(f"Error in deploy-and-monitor WebSocket: {e}") + print(f"Error in monitor-click-deployment WebSocket: {e}") try: - await websocket.send_json({"status": "Error", "error": f"Unexpected error: {str(e)}"}) + await websocket.send_json({"status": "Error", "message": f"Monitoring error: {str(e)}"}) except: - pass # WebSocket might already be closed + pass finally: - # Clean up SSH connection if it exists - if ssh_connection: - try: - ssh_connection.close() - print("SSH connection cleaned up") - except: - pass - # Ensure WebSocket is closed + # Clean up WebSocket connection try: if not websocket.client_state.name == 'DISCONNECTED': await websocket.close() diff --git a/studio-backend/app/services/clickdeploy_service.py b/studio-backend/app/services/clickdeploy_service.py index 955e52f..26faae4 100644 --- a/studio-backend/app/services/clickdeploy_service.py +++ b/studio-backend/app/services/clickdeploy_service.py @@ -6,66 +6,187 @@ import tempfile import json import zipfile -import datetime import time +import concurrent.futures +import asyncio from app.services.exporter_service import convert_proj_info_to_compose from app.services.workflow_info_service import WorkflowInfo from app.utils.exporter_utils import process_opea_services -def upload_pipeline_zip(hostname, username, pipeline_flow): - print("[INFO] Starting deployment to remote server...") - remote_zip_path = f"/home/{username}/docker-compose.zip" +async def async_click_deployment(remote_host, remote_user, pipeline_flow_dict, chatflow_id): + """ + Async wrapper for the click deployment process that can be used with BackgroundTasks + """ + try: + print(f"[INFO] Starting async deployment process for chatflow {chatflow_id}") + + # Run the synchronous deployment function in a thread pool + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as executor: + response = await loop.run_in_executor( + executor, + click_deploy_pipeline, + remote_host, + remote_user, + pipeline_flow_dict, + chatflow_id + ) + + print(f"[INFO] Async deployment process completed for chatflow {chatflow_id}: {response}") + return response + + except Exception as e: + print(f"[ERROR] Exception in async deployment process for chatflow {chatflow_id}: {e}") + import traceback + print(f"[ERROR] Traceback: {traceback.format_exc()}") + raise e + +def click_deploy_pipeline(hostname, username, pipeline_flow, chatflow_id): + """ + Main deployment function that handles the entire deployment process. + Detects existing deployment, stops it if necessary, uploads files, and starts services. + """ + print(f"[INFO] Starting click deployment for chatflow {chatflow_id}") + print(f"[DEBUG] Deployment parameters - hostname: {hostname}, username: {username}") + print(f"[DEBUG] pipeline_flow type: {type(pipeline_flow)}") temp_dir = None remote_compose_dir = "genaistudio-compose" + try: - print("[INFO] Creating ZIP locally...") - zip_path, temp_dir = create_zip_locally(pipeline_flow, hostname) - print(f"[INFO] ZIP created at {zip_path}") - - print("[INFO] Connecting to remote server via SSH...") + # Connect to remote server + print("[INFO] Connecting to remote server...") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname, username=username) - print("[INFO] SSH connection established.") + print("[INFO] Connected to remote server.") - print("[INFO] Opening SFTP session...") + # Check for existing deployment and stop if necessary + print("[INFO] Checking for existing deployment...") + _, stdout, _ = ssh.exec_command(f"ls -d {remote_compose_dir}", get_pty=True) + if stdout.channel.recv_exit_status() == 0: + print("[INFO] Existing deployment found. Stopping services...") + _, stdout, stderr = ssh.exec_command(f"cd {remote_compose_dir} && rm nohup.out", get_pty=True) + _, stdout, _ = ssh.exec_command(f"ls -d {remote_compose_dir}", get_pty=True) + stop_existing_deployment(ssh, remote_compose_dir) + print("[INFO] Existing deployment stopped.") + else: + print("[INFO] No existing deployment found.") + + # Step 4: Upload and extract files + print("[INFO] Creating deployment package...") + zip_path, temp_dir = create_zip_locally(pipeline_flow, hostname) + remote_zip_path = f"/home/{username}/docker-compose.zip" + print("[INFO] Uploading deployment package...") sftp = ssh.open_sftp() - print("[INFO] SFTP session opened.") sftp.put(zip_path, remote_zip_path) - print(f"[INFO] ZIP uploaded to {remote_zip_path}") sftp.close() - print("[INFO] SFTP session closed.") + print("[INFO] Deployment package uploaded.") - # Check if genaistudio-compose directory exists - print(f"[INFO] Checking if {remote_compose_dir} directory exists...") - _, stdout, stderr = ssh.exec_command(f"ls -d {remote_compose_dir}", get_pty=True) - exit_status = stdout.channel.recv_exit_status() - # Only upload and ensure directory, do not stop services here - if exit_status != 0: - print(f"[INFO] {remote_compose_dir} does not exist, will create new directory") - _, stdout, stderr = ssh.exec_command(f"mkdir -p {remote_compose_dir}", get_pty=True) - exit_status = stdout.channel.recv_exit_status() - stderr_str = stderr.read().decode().strip() - print(f"[INFO] Command exit status: {exit_status}") - if exit_status != 0: - print(f"[ERROR] Failed to create directory: {stderr_str}") - raise Exception(f"Failed to create remote directory: {stderr_str}") + # Create directory and extract files + print("[INFO] Extracting deployment files...") + ssh.exec_command(f"mkdir -p {remote_compose_dir}") + extract_cmd = f"python3 -c \"import zipfile; zipfile.ZipFile('{remote_zip_path}').extractall('{remote_compose_dir}')\"" + _, stdout, stderr = ssh.exec_command(extract_cmd, get_pty=True) + + if stdout.channel.recv_exit_status() != 0: + error_msg = stderr.read().decode().strip() + raise Exception(f"Failed to extract files: {error_msg}") + + print("[INFO] Files extracted successfully.") + + # Debug: List extracted files + print("[DEBUG] Listing extracted files...") + _, stdout_ls, _ = ssh.exec_command(f"ls -la {remote_compose_dir}", get_pty=True) + file_list = stdout_ls.read().decode().strip() + print(f"[DEBUG] Extracted files:\n{file_list}") + + # Debug: Check .env file contents + print("[DEBUG] Checking .env file contents...") + _, stdout_env, _ = ssh.exec_command(f"cat {remote_compose_dir}/.env", get_pty=True) + env_contents = stdout_env.read().decode().strip() + print(f"[DEBUG] .env file contents:\n{env_contents}") + + # Clean up the ZIP file + ssh.exec_command(f"rm -f {remote_zip_path}") + + # Step 5: Start services + print("[INFO] Starting deployment services...") + start_cmd = f"cd {remote_compose_dir} && nohup docker compose up -d & sleep 0.1" + _, stdout, stderr = ssh.exec_command(start_cmd, get_pty=True) + + # Read the output successfully + stdout_output = stdout.read().decode().strip() + stderr_output = stderr.read().decode().strip() + start_exit_status = stdout.channel.recv_exit_status() + + print(f"[DEBUG] Docker compose stdout: {stdout_output}") + print(f"[DEBUG] Docker compose stderr: {stderr_output}") + print(f"[DEBUG] Docker compose exit status: {start_exit_status}") + + # Note: With nohup in background, exit status may not reflect actual docker compose status + # The command should succeed if the nohup command itself launches properly + print("[INFO] Docker compose command launched in background") + + # Give a moment for services to start + time.sleep(2) + + # Check if containers are starting + print("[DEBUG] Checking if containers are starting...") + _, stdout_ps, _ = ssh.exec_command(f"cd {remote_compose_dir} && docker compose ps", get_pty=True) + ps_output = stdout_ps.read().decode().strip() + print(f"[DEBUG] Initial container status:\n{ps_output}") + + print("[INFO] Docker compose launched successfully") + ssh.close() - print("[INFO] SSH connection closed.") + print("[INFO] Deployment initiated successfully.") + return { - "status": "success", - "message": "docker-compose.zip uploaded and directory ensured.", + "status": "In Progress", + "message": "Deployment initiated successfully", "compose_dir": remote_compose_dir, - "remote_zip_path": remote_zip_path + "remote_zip_path": remote_zip_path, + "logs": ["Deployment initiated successfully"] } + except Exception as e: - print(f"[ERROR] An error: {e}") - return {"error": str(e)} + print(f"[ERROR] Deployment failed: {e}") + return { + "error": str(e), + "logs": [f"Deployment failed: {str(e)}"] + } finally: if temp_dir: clean_up_temp_dir(temp_dir) +def stop_existing_deployment(ssh, compose_dir): + """Stop existing deployment services""" + try: + # Stop docker compose services - use synchronous approach to ensure completion + print("[DEBUG] Stopping existing services...") + _, stdout, stderr = ssh.exec_command(f"cd {compose_dir} && docker compose down --remove-orphans", get_pty=True) + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + error_output = stderr.read().decode().strip() + print(f"[WARNING] Error stopping services: {error_output}") + else: + print("[DEBUG] Docker compose down completed successfully") + + # Remove any dangling containers that might conflict + print("[DEBUG] Removing dangling containers...") + ssh.exec_command("docker container prune -f", get_pty=True) + + # Clean up old files after stopping services + print("[DEBUG] Cleaning up old deployment files...") + ssh.exec_command(f"cd {compose_dir} && rm -f .env app.nginx.conf.template compose.yaml workflow-info.json", get_pty=True) + print("[INFO] Existing deployment stopped and cleaned up.") + + except Exception as e: + print(f"[ERROR] Error stopping existing deployment: {e}") + raise Exception(f"Failed to stop existing deployment: {str(e)}") + def create_zip_locally(request, hostname): temp_dir = tempfile.mkdtemp() env_file_path = os.path.join(temp_dir, ".env") @@ -85,6 +206,10 @@ def create_zip_locally(request, hostname): try: with open(env_file_path, 'w') as f: f.write(f"public_host_ip={hostname}\n") + # Add proxy environment variables with empty defaults to avoid Docker warnings + f.write("http_proxy=\n") + f.write("https_proxy=\n") + f.write("no_proxy=\n") for key, value in ports_info.items(): f.write(f"{key}={value}\n") @@ -144,4 +269,201 @@ def clean_up_temp_dir(dir_path: str): try: shutil.rmtree(dir_path) except Exception as e: - logging.exception(f"An error occurred while deleting the temp directory {dir_path}.") \ No newline at end of file + logging.exception(f"An error occurred while deleting the temp directory {dir_path}.") + +async def async_check_deployment_status(remote_host, remote_user, compose_dir="genaistudio-compose"): + """ + Async wrapper for checking deployment status + """ + try: + # Run the synchronous status check function in a thread pool + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as executor: + result = await loop.run_in_executor( + executor, + check_deployment_status, + remote_host, + remote_user, + compose_dir + ) + return result + except Exception as e: + return { + "status": "Error", + "message": f"Async status check failed: {str(e)}", + "logs": [] + } + +def check_deployment_status(remote_host, remote_user, compose_dir="genaistudio-compose"): + """Check deployment status using docker ps commands""" + ssh_check = None + try: + ssh_check = paramiko.SSHClient() + ssh_check.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_check.connect(remote_host, username=remote_user) + + # Check if deployment directory exists + _, stdout, _ = ssh_check.exec_command(f"ls -d {compose_dir}") + if stdout.channel.recv_exit_status() != 0: + return {"status": "In Progress", "message": "Preparing deployment environment...", "logs": []} + + # Check if compose.yaml exists + _, stdout_compose, _ = ssh_check.exec_command(f"ls {compose_dir}/compose.yaml") + if stdout_compose.channel.recv_exit_status() != 0: + return {"status": "In Progress", "message": "Uploading deployment files...", "logs": []} + + # Check if compose.yaml exists + _, stdout_compose, _ = ssh_check.exec_command(f"ls {compose_dir}/nohup.out") + if stdout_compose.channel.recv_exit_status() != 0: + return {"status": "In Progress", "message": "Stopping existing services...", "logs": ['Stopping existing services...']} + + # Get the number of services defined in compose.yaml + _, stdout_num, stderr_num = ssh_check.exec_command(f"cd {compose_dir} && docker compose config --services | wc -l") + num_exit_status = stdout_num.channel.recv_exit_status() + if num_exit_status != 0: + error_msg = stderr_num.read().decode().strip() + return {"status": "Error", "message": f"Failed to read compose configuration: {error_msg}", "logs": []} + + num_services_output = stdout_num.read().decode().strip().splitlines() + num_services_lines = [line for line in num_services_output if not line.startswith('WARN') and line.strip()] + num_services_str = num_services_lines[-1] if num_services_lines else '0' + + # Run docker compose ps to get service status + _, stdout_ps, stderr_ps = ssh_check.exec_command(f"cd {compose_dir} && docker compose ps --all --format json") + ps_exit_status = stdout_ps.channel.recv_exit_status() + if ps_exit_status != 0: + error_msg = stderr_ps.read().decode().strip() + return {"status": "In Progress", "message": f"Starting deployment services... ({error_msg})", "logs": []} + + out = stdout_ps.read().decode() + json_lines = [line for line in out.strip().splitlines() if line.strip() and not line.strip().startswith('WARN')] + + all_services = [] + for line in json_lines: + try: + ps_data = json.loads(line) + if isinstance(ps_data, dict): + all_services.append(ps_data) + except Exception as e: + print(f"[ERROR] Failed to parse docker compose ps output: {line}") + continue + + # Read deployment logs if available + _, stdout_logs, _ = ssh_check.exec_command(f"cd {compose_dir} && tail -n 10 nohup.out 2>/dev/null || echo 'No logs yet'") + log_lines = stdout_logs.read().decode().splitlines() + + # Check for error patterns in logs + error_patterns = [ + "Error response from daemon", + "dependency failed to start", + "Container.*Error", + "failed to start", + "Cannot start service" + ] + + log_text = '\n'.join(log_lines) + for pattern in error_patterns: + import re + if re.search(pattern, log_text, re.IGNORECASE): + return { + "status": "Error", + "message": log_text, + "logs": log_lines, + "services": all_services + } + + # Analyze service statuses + services_running = 0 + services_exited = 0 + services_unhealthy = 0 + expected_completed_containers = [ + "model-downloader", "downloader", "init", "setup", "migrate", "seed" + ] + + for service in all_services: + state = service.get("State", "") + name = service.get("Name", "").lower() + health = service.get("Health", "") + + if state == "running": + if health == "unhealthy": + services_unhealthy += 1 + else: + services_running += 1 + elif state == "exited": + # Check if this is an expected completed container + is_expected_completed = any(keyword in name for keyword in expected_completed_containers) + if not is_expected_completed: + services_exited += 1 + + total_defined = int(num_services_str) + services_expected_completed = len([s for s in all_services if s.get("State") == "exited" + and any(keyword in s.get("Name", "").lower() + for keyword in expected_completed_containers)]) + + # Determine overall status + if services_unhealthy > 0: + # Get detailed error info for unhealthy services + unhealthy_services = [s for s in all_services if s.get("Health") == "unhealthy"] + error_details = [] + for service in unhealthy_services: + service_name = service.get("Name", "unknown") + error_details.append(f"Service {service_name} is unhealthy") + + return { + "status": "Error", + "message": f"{services_unhealthy} services are unhealthy: {', '.join(error_details)}", + "logs": log_lines, + "services": all_services + } + elif services_exited > 0: + # Get detailed error info for exited services + exited_services = [s for s in all_services if s.get("State") == "exited" + and not any(keyword in s.get("Name", "").lower() + for keyword in expected_completed_containers)] + error_details = [] + for service in exited_services: + service_name = service.get("Name", "unknown") + exit_code = service.get("ExitCode", "unknown") + error_details.append(f"{service_name}(exit:{exit_code})") + + return { + "status": "Error", + "message": f"{services_exited} services failed: {', '.join(error_details)}", + "logs": log_lines, + "services": all_services + } + elif (services_running + services_expected_completed) == total_defined and services_running > 0: + return { + "status": "Success", + "message": f"All {total_defined} services are running successfully. Application accessible at http://{remote_host}:8090", + "logs": log_lines, + "services": all_services + } + elif len(all_services) < total_defined: + return { + "status": "In Progress", + "message": f"Starting services... ({len(all_services)}/{total_defined} services created)", + "logs": log_lines, + "services": all_services + } + else: + return { + "status": "In Progress", + "message": f"Services starting... ({services_running} running, {services_expected_completed} completed)", + "logs": log_lines, + "services": all_services + } + + except Exception as e: + return { + "status": "Error", + "message": f"Status check failed: {str(e)}", + "logs": [] + } + finally: + if ssh_check: + try: + ssh_check.close() + except: + pass \ No newline at end of file diff --git a/studio-frontend/package.json b/studio-frontend/package.json index c0b8f4c..115e3d0 100644 --- a/studio-frontend/package.json +++ b/studio-frontend/package.json @@ -58,7 +58,8 @@ "sqlite3" ], "overrides": { - "set-value": "^3.0.3" + "set-value": "^3.0.3", + "form-data": "4.0.4" } }, "engines": { @@ -79,7 +80,8 @@ "esbuild": ">=0.25.0", "cross-spawn": ">=7.0.5", "solid-js": ">=1.9.4", - "tar-fs": ">=3.0.8" + "tar-fs": ">=3.0.8", + "form-data": "4.0.4" }, "eslintIgnore": [ "**/dist", diff --git a/studio-frontend/packages/server/src/controllers/chatflows/index.ts b/studio-frontend/packages/server/src/controllers/chatflows/index.ts index d8166b2..fe13033 100644 --- a/studio-frontend/packages/server/src/controllers/chatflows/index.ts +++ b/studio-frontend/packages/server/src/controllers/chatflows/index.ts @@ -285,6 +285,57 @@ const getPublicKey = async (req: Request, res: Response, next: NextFunction) => } } +const getDeploymentStatus = async (req: Request, res: Response, next: NextFunction) => { + try { + if (typeof req.params === 'undefined' || !req.params.id) { + throw new InternalFlowiseError(StatusCodes.PRECONDITION_FAILED, `Error: chatflowsRouter.getDeploymentStatus - id not provided!`) + } + const chatflow = await chatflowsService.getChatflowById(req.params.id) + if (!chatflow) { + return res.status(404).json({ error: 'Chatflow not found' }) + } + + let config = null + let logs = [] + + try { + config = chatflow.deploymentConfig ? JSON.parse(chatflow.deploymentConfig) : null + } catch (e) { + console.error('Error parsing deploymentConfig:', e) + } + + try { + logs = chatflow.deploymentLogs ? JSON.parse(chatflow.deploymentLogs) : [] + } catch (e) { + console.error('Error parsing deploymentLogs:', e) + } + + const response = { + status: chatflow.deploymentStatus || 'Not Started', + message: '', + config: config, + logs: logs + } + + return res.json(response) + } catch (error) { + next(error) + } +} + +const updateDeploymentStatus = async (req: Request, res: Response, next: NextFunction) => { + try { + if (typeof req.params === 'undefined' || !req.params.id) { + throw new InternalFlowiseError(StatusCodes.PRECONDITION_FAILED, `Error: chatflowsRouter.updateDeploymentStatus - id not provided!`) + } + const { status, message, logs, config } = req.body + const result = await chatflowsService.updateDeploymentStatus(req.params.id, status, message, logs, config) + return res.json(result) + } catch (error) { + next(error) + } +} + const oneClickDeployment = async (req: Request, res: Response, next: NextFunction) => { console.log('Deploying one click') try { @@ -316,5 +367,7 @@ export default { stopChatflowSandbox, buildDeploymentPackage, getPublicKey, + getDeploymentStatus, + updateDeploymentStatus, oneClickDeployment } diff --git a/studio-frontend/packages/server/src/database/entities/ChatFlow.ts b/studio-frontend/packages/server/src/database/entities/ChatFlow.ts index 707a73f..a8655ca 100644 --- a/studio-frontend/packages/server/src/database/entities/ChatFlow.ts +++ b/studio-frontend/packages/server/src/database/entities/ChatFlow.ts @@ -58,6 +58,15 @@ export class ChatFlow implements IChatFlow { @Column({nullable: true, type: 'text'}) sandboxDebugLogsUrl?: string + @Column({nullable: true, type: 'text'}) + deploymentStatus?: string + + @Column({nullable: true, type: 'text'}) + deploymentConfig?: string + + @Column({nullable: true, type: 'text'}) + deploymentLogs?: string + @Column({ type: 'timestamp' }) @CreateDateColumn() createdDate: Date diff --git a/studio-frontend/packages/server/src/database/migrations/mysql/1754700956637-AddDeploymentStatusToChatFlow.ts b/studio-frontend/packages/server/src/database/migrations/mysql/1754700956637-AddDeploymentStatusToChatFlow.ts new file mode 100644 index 0000000..0661dc3 --- /dev/null +++ b/studio-frontend/packages/server/src/database/migrations/mysql/1754700956637-AddDeploymentStatusToChatFlow.ts @@ -0,0 +1,26 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddDeploymentStatusToChatFlow1754700956637 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + const deploymentStatusExists = await queryRunner.hasColumn('chat_flow', 'deploymentStatus') + if (!deploymentStatusExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentStatus\` varchar(255) DEFAULT NULL;`) + } + + const deploymentConfigExists = await queryRunner.hasColumn('chat_flow', 'deploymentConfig') + if (!deploymentConfigExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentConfig\` TEXT DEFAULT NULL;`) + } + + const deploymentLogsExists = await queryRunner.hasColumn('chat_flow', 'deploymentLogs') + if (!deploymentLogsExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentLogs\` TEXT DEFAULT NULL;`) + } + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentStatus\`;`) + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentConfig\`;`) + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentLogs\`;`) + } +} diff --git a/studio-frontend/packages/server/src/database/migrations/mysql/index.ts b/studio-frontend/packages/server/src/database/migrations/mysql/index.ts index 611afb6..3645d89 100644 --- a/studio-frontend/packages/server/src/database/migrations/mysql/index.ts +++ b/studio-frontend/packages/server/src/database/migrations/mysql/index.ts @@ -29,6 +29,7 @@ import { AddArtifactsToChatMessage1726156258465 } from './1726156258465-AddArtif import { AddStudioFieldsToChatFlow1733282099772 } from './1733282099772-AddStudioFieldsToChatFlow' import { AddSandboxTracerUrlToChatFlow1743740099772 } from './1743740099772-AddSandboxTracerUrlToChatFlow' import { AddSandboxDebugLogsUrlToChatFlow1749612373191 } from './1749612373191-AddSandboxDebugLogsUrlToChatFlow' +import { AddDeploymentStatusToChatFlow1754700956637 } from './1754700956637-AddDeploymentStatusToChatFlow' export const mysqlMigrations = [ @@ -62,5 +63,6 @@ export const mysqlMigrations = [ AddArtifactsToChatMessage1726156258465, AddStudioFieldsToChatFlow1733282099772, AddSandboxTracerUrlToChatFlow1743740099772, - AddSandboxDebugLogsUrlToChatFlow1749612373191 + AddSandboxDebugLogsUrlToChatFlow1749612373191, + AddDeploymentStatusToChatFlow1754700956637 ] diff --git a/studio-frontend/packages/server/src/database/migrations/sqlite/1754700956637-AddDeploymentStatusToChatFlow.ts b/studio-frontend/packages/server/src/database/migrations/sqlite/1754700956637-AddDeploymentStatusToChatFlow.ts new file mode 100644 index 0000000..0661dc3 --- /dev/null +++ b/studio-frontend/packages/server/src/database/migrations/sqlite/1754700956637-AddDeploymentStatusToChatFlow.ts @@ -0,0 +1,26 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddDeploymentStatusToChatFlow1754700956637 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + const deploymentStatusExists = await queryRunner.hasColumn('chat_flow', 'deploymentStatus') + if (!deploymentStatusExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentStatus\` varchar(255) DEFAULT NULL;`) + } + + const deploymentConfigExists = await queryRunner.hasColumn('chat_flow', 'deploymentConfig') + if (!deploymentConfigExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentConfig\` TEXT DEFAULT NULL;`) + } + + const deploymentLogsExists = await queryRunner.hasColumn('chat_flow', 'deploymentLogs') + if (!deploymentLogsExists) { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD COLUMN \`deploymentLogs\` TEXT DEFAULT NULL;`) + } + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentStatus\`;`) + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentConfig\`;`) + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`deploymentLogs\`;`) + } +} diff --git a/studio-frontend/packages/server/src/database/migrations/sqlite/index.ts b/studio-frontend/packages/server/src/database/migrations/sqlite/index.ts index 09149fd..c9ed343 100644 --- a/studio-frontend/packages/server/src/database/migrations/sqlite/index.ts +++ b/studio-frontend/packages/server/src/database/migrations/sqlite/index.ts @@ -28,6 +28,7 @@ import { AddCustomTemplate1725629836652 } from './1725629836652-AddCustomTemplat import { AddStudioFieldsToChatFlow1733282099772 } from './1733282099772-AddStudioFieldsToChatFlow' import { AddSandboxTracerUrlToChatFlow1743740099772 } from './1743740099772-AddSandboxTracerUrlToChatFlow' import { AddSandboxDebugLogsUrlToChatFlow1749612373191 } from './1749612373191-AddSandboxDebugLogsUrlToChatFlow' +import { AddDeploymentStatusToChatFlow1754700956637 } from './1754700956637-AddDeploymentStatusToChatFlow' export const sqliteMigrations = [ Init1693835579790, @@ -59,5 +60,6 @@ export const sqliteMigrations = [ AddCustomTemplate1725629836652, AddStudioFieldsToChatFlow1733282099772, AddSandboxTracerUrlToChatFlow1743740099772, - AddSandboxDebugLogsUrlToChatFlow1749612373191 + AddSandboxDebugLogsUrlToChatFlow1749612373191, + AddDeploymentStatusToChatFlow1754700956637 ] diff --git a/studio-frontend/packages/server/src/routes/chatflows/index.ts b/studio-frontend/packages/server/src/routes/chatflows/index.ts index c0bbe13..669aaf1 100644 --- a/studio-frontend/packages/server/src/routes/chatflows/index.ts +++ b/studio-frontend/packages/server/src/routes/chatflows/index.ts @@ -10,11 +10,13 @@ router.post('/importchatflows', chatflowsController.importChatflows) // READ router.get('/pubkey', chatflowsController.getPublicKey) router.get('/', chatflowsController.getAllChatflowsbyUserId) +router.get('/deployment-status/:id', chatflowsController.getDeploymentStatus) router.get(['/', '/:id'], chatflowsController.getChatflowById) router.get(['/apikey/', '/apikey/:apikey'], chatflowsController.getChatflowByApiKey) // UPDATE router.put(['/', '/:id'], chatflowsController.updateChatflow) +router.put('/deployment-status/:id', chatflowsController.updateDeploymentStatus) // DELETE router.delete(['/', '/:id'], chatflowsController.deleteChatflow) diff --git a/studio-frontend/packages/server/src/services/chatflows/index.ts b/studio-frontend/packages/server/src/services/chatflows/index.ts index 2fcfac5..4ae6c1d 100644 --- a/studio-frontend/packages/server/src/services/chatflows/index.ts +++ b/studio-frontend/packages/server/src/services/chatflows/index.ts @@ -504,10 +504,14 @@ const oneClickDeploymentService = async (chatflowId: string, deploymentConfig: R try { const chatflow = await generatePipelineJson(chatflowId) const studioServerUrl = STUDIO_SERVER_URL - const endpoint = 'studio-backend/upload-pipeline-files' - console.log('chatflow', JSON.stringify(chatflow)) - console.log('studioServerUrl', studioServerUrl) - console.log('deploymentConfig', deploymentConfig) + const endpoint = 'studio-backend/click-deployment' + // console.log('chatflow', JSON.stringify(chatflow)) + // console.log('studioServerUrl', studioServerUrl) + // console.log('deploymentConfig', deploymentConfig) + + // Update chatflow with deployment status and config from backend response + const appServer = getRunningExpressApp() + const chatflowEntity = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) const response = await axios.post(`${studioServerUrl}/${endpoint}`, { remoteHost: deploymentConfig.hostname, remoteUser: deploymentConfig.username, @@ -516,8 +520,24 @@ const oneClickDeploymentService = async (chatflowId: string, deploymentConfig: R headers: { 'Content-Type': 'application/json' }, timeout: 60 * 1000 }) + if (chatflowEntity) { + chatflowEntity.deploymentStatus = response.data.status + chatflowEntity.deploymentConfig = JSON.stringify(deploymentConfig) + chatflowEntity.deploymentLogs = JSON.stringify([response.data.message]) + await appServer.AppDataSource.getRepository(ChatFlow).save(chatflowEntity) + } + return response.data } catch (error: unknown) { + // Update chatflow with error status + const appServer = getRunningExpressApp() + const chatflowEntity = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) + if (chatflowEntity) { + chatflowEntity.deploymentStatus = 'Error' + chatflowEntity.deploymentLogs = JSON.stringify([`Error: ${error instanceof Error ? error.message : String(error)}`]) + await appServer.AppDataSource.getRepository(ChatFlow).save(chatflowEntity) + } + if (error instanceof Error) { console.error(`Error: ${error.stack}`) } else { @@ -527,6 +547,45 @@ const oneClickDeploymentService = async (chatflowId: string, deploymentConfig: R } } +const updateDeploymentStatus = async (chatflowId: string, status: string, message?: string, logs?: string[], config?: Record) => { + try { + const appServer = getRunningExpressApp() + const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) + if (chatflow) { + chatflow.deploymentStatus = status + + // Update logs array - either use provided logs or append message to existing logs + let updatedLogs: string[] = [] + if (logs && logs.length > 0) { + updatedLogs = logs + } else { + // Parse existing logs and append new message + try { + const existingLogs = chatflow.deploymentLogs ? JSON.parse(chatflow.deploymentLogs) : [] + updatedLogs = Array.isArray(existingLogs) ? existingLogs : [] + if (message) { + updatedLogs.push(message) + } + } catch (e) { + updatedLogs = message ? [message] : [] + } + } + chatflow.deploymentLogs = JSON.stringify(updatedLogs) + + if (config) { + chatflow.deploymentConfig = JSON.stringify(config) + } + await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) + } + return chatflow + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: chatflowsService.updateDeploymentStatus - ${getErrorMessage(error)}` + ) + } +} + const _checkAndUpdateDocumentStoreUsage = async (chatflow: ChatFlow) => { const parsedFlowData: IReactFlowObject = JSON.parse(chatflow.flowData) const nodes = parsedFlowData.nodes @@ -556,5 +615,6 @@ export default { stopChatflowSandboxService, buildDeploymentPackageService, getSinglePublicChatbotConfig, - oneClickDeploymentService + oneClickDeploymentService, + updateDeploymentStatus } diff --git a/studio-frontend/packages/ui/src/api/chatflows.js b/studio-frontend/packages/ui/src/api/chatflows.js index be642a4..5706d7c 100644 --- a/studio-frontend/packages/ui/src/api/chatflows.js +++ b/studio-frontend/packages/ui/src/api/chatflows.js @@ -37,6 +37,10 @@ const getPublicKey = () => client.get('/chatflows/pubkey') const clickDeployment = (id, body) => client.post(`chatflows-sandbox/one-click-deployment/${id}`, body) +const getDeploymentStatus = (id) => client.get(`/chatflows/deployment-status/${id}`) + +const updateDeploymentStatus = (id, body) => client.put(`/chatflows/deployment-status/${id}`, body) + export default { getAllChatflows, getAllAgentflows, @@ -55,5 +59,7 @@ export default { stopSandbox, buildDeploymentPackage, getPublicKey, - clickDeployment + clickDeployment, + getDeploymentStatus, + updateDeploymentStatus } diff --git a/studio-frontend/packages/ui/src/ui-component/dialog/OneClickDeploymentDialog.jsx b/studio-frontend/packages/ui/src/ui-component/dialog/OneClickDeploymentDialog.jsx index 924160d..1d139b1 100644 --- a/studio-frontend/packages/ui/src/ui-component/dialog/OneClickDeploymentDialog.jsx +++ b/studio-frontend/packages/ui/src/ui-component/dialog/OneClickDeploymentDialog.jsx @@ -1,6 +1,6 @@ import { createPortal } from 'react-dom'; import { useDispatch } from 'react-redux'; -import { useEffect, useState, useRef } from 'react'; +import { useEffect, useState } from 'react'; import PropTypes from 'prop-types'; import { Box, @@ -19,94 +19,36 @@ import { import { StyledButton } from '@/ui-component/button/StyledButton'; import { IconCopy, IconCheck } from '@tabler/icons-react'; import { - closeSnackbar as closeSnackbarAction, enqueueSnackbar as enqueueSnackbarAction, HIDE_CANVAS_DIALOG, SHOW_CANVAS_DIALOG } from '@/store/actions'; import chatflowsApi from '@/api/chatflows'; -const OneClickDeploymentDialog = ({ show, dialogProps, onCancel, onConfirm, deployStatus, setDeployStatus, deploymentConfig, setDeploymentConfig, deployWebSocket, setDeployWebSocket }) => { +const OneClickDeploymentDialog = ({ show, dialogProps, onCancel, onConfirm, deployStatus, setDeployStatus, deploymentConfig, setDeploymentConfig, deployWebSocket }) => { const portalElement = document.getElementById('portal'); const dispatch = useDispatch(); const [pubkey, setPubkey] = useState(''); const [copied, setCopied] = useState(false); const [deploying, setDeploying] = useState(false); - const [deploymentCompleted, setDeploymentCompleted] = useState(false); - // Remove local ws state - use the persistent one from parent - const wsRef = useRef(deployWebSocket); - const deploymentCompletedRef = useRef(deploymentCompleted); - // Sync the ref when the parent WebSocket changes + // Monitor deployment status and WebSocket state from parent useEffect(() => { - wsRef.current = deployWebSocket; - deploymentCompletedRef.current = deploymentCompleted; if (deployWebSocket && deployWebSocket.readyState === WebSocket.OPEN) { setDeploying(true); - // Set up event handlers for the existing WebSocket - deployWebSocket.onmessage = (event) => { - let data; - try { data = JSON.parse(event.data); } catch { return; } - console.log('WebSocket message:', data); - if (data.status === 'Done') { - setDeployStatus(['Success', ...(data.success || '').split(',').map(line => line.trim())]); - setDeploying(false); - setDeploymentCompleted(true); - deploymentCompletedRef.current = true; - // Clean up WebSocket on completion - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - setDeployWebSocket(null); - } - } else if (data.status === 'Error') { - let lines = []; - if (Array.isArray(data.error)) { - lines = data.error; - } else if (typeof data.error === 'string') { - lines = data.error.split(',').map(line => line.trim()); - } else { - lines = ['Unknown error']; - } - setDeployStatus(['Error', ...lines]); - setDeploying(false); - setDeploymentCompleted(true); - deploymentCompletedRef.current = true; - // Clean up WebSocket on error - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - setDeployWebSocket(null); - } - } else if (data.status === 'In Progress') { - setDeployStatus(['Info', data.nohup_out]); - } else if (data.status === 'Preparing') { - setDeployStatus(['Info', data.message]); - } - }; - deployWebSocket.onerror = (error) => { - console.error('WebSocket error:', error); - setDeployStatus(['Error', 'WebSocket connection error']); + } else { + // Check if deployment has completed + if (deployStatus && (deployStatus[0] === 'Success' || deployStatus[0] === 'Error')) { setDeploying(false); - }; - deployWebSocket.onclose = (event) => { - console.log('WebSocket closed:', event.code, event.reason); - wsRef.current = null; - setDeployWebSocket(null); - // Only show error if deployment was still in progress and not completed successfully - if (deploying && !deploymentCompletedRef.current) { - setDeployStatus(['Error', 'Connection lost during deployment']); - setDeploying(false); - } - }; + } } - }, [deployWebSocket, setDeployWebSocket, setDeployStatus, deploying, deploymentCompleted]); + }, [deployWebSocket, deployStatus]); useEffect(() => { if (show) { dispatch({ type: SHOW_CANVAS_DIALOG }); - setDeploymentCompleted(false); // Reset completion flag when dialog opens - deploymentCompletedRef.current = false; // Reset ref too + + // Load public key chatflowsApi.getPublicKey().then(response => { if (response.error) { dispatch(enqueueSnackbarAction({ @@ -118,10 +60,51 @@ const OneClickDeploymentDialog = ({ show, dialogProps, onCancel, onConfirm, depl } }); + // Check if there's an ongoing deployment by fetching chatflow data + if (dialogProps.id) { + // Add a small delay before checking to avoid race conditions + setTimeout(() => { + chatflowsApi.getSpecificChatflow(dialogProps.id).then(response => { + if (response.data && response.data.deploymentStatus) { + // Parse existing deployment config if available + if (response.data.deploymentConfig) { + try { + const existingConfig = JSON.parse(response.data.deploymentConfig); + setDeploymentConfig(existingConfig); + } catch (e) { + console.warn('Failed to parse existing deployment config'); + } + } + + if (response.data.deploymentStatus === 'In Progress') { + setDeploying(true); + // Use existing deployment logs instead of generic reconnecting message + const logs = response.data.deploymentLogs ? + JSON.parse(response.data.deploymentLogs) : + ['Deployment in progress...']; + const logText = logs.length > 0 ? logs.join('\n') : 'Deployment in progress...'; + setDeployStatus(['Info', logText]); + } else if (response.data.deploymentStatus === 'Success' || response.data.deploymentStatus === 'Error') { + // Deployment already completed, just show the final status + const finalStatus = response.data.deploymentStatus; + // For existing deployments, get message from logs (smart content selection) + const logs = response.data.deploymentLogs ? + JSON.parse(response.data.deploymentLogs) : + [finalStatus === 'Success' ? 'Deployment completed successfully' : 'Deployment failed']; + const message = logs[0] || (finalStatus === 'Success' ? 'Deployment completed successfully' : 'Deployment failed'); + setDeployStatus([finalStatus, message]); + setDeploying(false); + } + } + }).catch(error => { + console.error('Failed to check deployment status:', error); + }); + }, 500); // 500ms delay to avoid race conditions + } + // When modal reopens, check if there's a WebSocket still running - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { + if (deployWebSocket && deployWebSocket.readyState === WebSocket.OPEN) { setDeploying(true); // Resume showing deploying state - // WebSocket event handlers are already set up in the other useEffect } } else { dispatch({ type: HIDE_CANVAS_DIALOG }); @@ -130,16 +113,11 @@ const OneClickDeploymentDialog = ({ show, dialogProps, onCancel, onConfirm, depl } return () => { dispatch({ type: HIDE_CANVAS_DIALOG }); - // Only clean up on component unmount (when parent component unmounts) - // Parent component will handle WebSocket cleanup }; - }, [show, dispatch]); + }, [show, dispatch, dialogProps.id]); const handleCancel = () => { - // Don't clean up WebSocket - let it continue monitoring in background - // Just close the modal while keeping the deployment running - setDeploying(false); // Reset local deploying state for UI - onCancel(); // Call the parent's onCancel to close modal + onCancel(); }; const handleCopy = () => { @@ -148,86 +126,20 @@ const OneClickDeploymentDialog = ({ show, dialogProps, onCancel, onConfirm, depl setTimeout(() => setCopied(false), 1500); }; + const handleOneClickDeploy = async () => { setDeploying(true); - setDeploymentCompleted(false); // Reset completion flag - deploymentCompletedRef.current = false; // Reset ref too - setDeployStatus(['Info', 'Connecting to machine...']); + try { + // Call the parent's deployment handler - it will handle WebSocket setup const result = await onConfirm(dialogProps.id, deploymentConfig); if (result && result.error) { setDeployStatus(['Error', result.error]); setDeploying(false); return; } - const compose_dir = result?.compose_dir; - const wsUrl = `${window.location.origin.replace(/^http/, 'ws')}/studio-backend/ws/deploy-and-monitor`; - const wsInstance = new window.WebSocket(wsUrl); - - // Update parent with the WebSocket reference for persistence - wsRef.current = wsInstance; - setDeployWebSocket(wsInstance); - - wsInstance.onopen = () => { - wsInstance.send(JSON.stringify({ hostname: deploymentConfig.hostname, username: deploymentConfig.username, compose_dir: compose_dir })); - }; - wsInstance.onmessage = (event) => { - let data; - try { data = JSON.parse(event.data); } catch { return; } - console.log('WebSocket message:', data); - if (data.status === 'Done') { - setDeployStatus(['Success', ...(data.success || '').split(',').map(line => line.trim())]); - setDeploying(false); - setDeploymentCompleted(true); - deploymentCompletedRef.current = true; - // Clean up WebSocket on completion - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - setDeployWebSocket(null); - } - } else if (data.status === 'Error') { - let lines = []; - if (Array.isArray(data.error)) { - lines = data.error; - } else if (typeof data.error === 'string') { - lines = data.error.split(',').map(line => line.trim()); - } else { - lines = ['Unknown error']; - } - setDeployStatus(['Error', ...lines]); - setDeploying(false); - setDeploymentCompleted(true); - deploymentCompletedRef.current = true; - // Clean up WebSocket on error - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - setDeployWebSocket(null); - } - } else if (data.status === 'In Progress') { - setDeployStatus(['Info', data.nohup_out]); - } else if (data.status === 'Preparing') { - setDeployStatus(['Info', data.message]); - } - }; - wsInstance.onerror = (error) => { - console.error('WebSocket error:', error); - setDeployStatus(['Error', 'WebSocket connection error']); - setDeploying(false); - }; - wsInstance.onclose = (event) => { - console.log('WebSocket closed:', event.code, event.reason); - wsRef.current = null; - setDeployWebSocket(null); - // Only show error if deployment was still in progress and not completed successfully - if (deploying && !deploymentCompletedRef.current) { - setDeployStatus(['Error', 'Connection lost during deployment']); - setDeploying(false); - } - }; } catch (err) { - setDeployStatus(['Error', 'Deployment failed']); + setDeployStatus(['Error', 'Failed to start deployment']); setDeploying(false); } }; @@ -324,8 +236,7 @@ OneClickDeploymentDialog.propTypes = { setDeployStatus: PropTypes.func, deploymentConfig: PropTypes.object, setDeploymentConfig: PropTypes.func, - deployWebSocket: PropTypes.object, - setDeployWebSocket: PropTypes.func + deployWebSocket: PropTypes.object }; export default OneClickDeploymentDialog; diff --git a/studio-frontend/packages/ui/src/ui-component/table/FlowListTable.jsx b/studio-frontend/packages/ui/src/ui-component/table/FlowListTable.jsx index a83a3a0..bcc415a 100644 --- a/studio-frontend/packages/ui/src/ui-component/table/FlowListTable.jsx +++ b/studio-frontend/packages/ui/src/ui-component/table/FlowListTable.jsx @@ -99,7 +99,8 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF // console.log('handleSortData', data); const sorted = [...data].map((row) => ({ ...row, - sandboxStatus: row.sandboxStatus || 'Not Running' // Ensure initial status + sandboxStatus: row.sandboxStatus || 'Not Running', // Ensure initial status + deploymentStatus: row.deploymentStatus || 'Not Started' // Ensure initial deployment status })).sort((a, b) => { if (orderBy === 'name') { return order === 'asc' ? (a.name || '').localeCompare(b.name || '') : (b.name || '').localeCompare(a.name || ''); @@ -149,10 +150,43 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF return ws; }; sortedData.map((row) => { + // Handle sandbox websockets if (row.sandboxStatus === 'Getting Ready' || row.sandboxStatus === 'Stopping' || row.sandboxStatus === 'Deleting existing namespace') { const ws = openWebSocketConnection(row.id, row.sandboxStatus); openConnections.push(ws); } + + // Handle deployment websockets + if (row.deploymentStatus === 'In Progress' && (!deployWebSocketsById[row.id] || deployWebSocketsById[row.id].readyState !== WebSocket.OPEN)) { + console.log(`Found in-progress deployment for chatflow ${row.id}, creating websocket...`); + + // Parse deployment config if available + let deploymentConfig = { hostname: '', username: '' }; + if (row.deploymentConfig) { + try { + deploymentConfig = JSON.parse(row.deploymentConfig); + } catch (e) { + console.warn('Failed to parse deployment config for websocket reconnection'); + } + } + + // Set initial status from existing logs + if (row.deploymentLogs) { + try { + const logs = JSON.parse(row.deploymentLogs); + const logText = logs.length > 0 ? logs.join('\n') : 'Deployment in progress...'; + setDeployStatusForId(row.id, ['Info', logText]); + } catch (e) { + setDeployStatusForId(row.id, ['Info', 'Deployment in progress...']); + } + } else { + setDeployStatusForId(row.id, ['Info', 'Deployment in progress...']); + } + + // Create websocket connection to monitor existing deployment + const deployWs = startClickDeploymentMonitoring(row.id, deploymentConfig); + openConnections.push(deployWs); + } }); return () => { openConnections.forEach((ws) => { @@ -178,6 +212,21 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF ); }; + const updateDeploymentStatus = (id, newStatus, deploymentConfig = null, deploymentLogs = null) => { + setSortedData((prevData) => + prevData.map((row) => + row.id === id + ? { + ...row, + deploymentStatus: newStatus, + deploymentConfig: deploymentConfig || row.deploymentConfig, + deploymentLogs: deploymentLogs || row.deploymentLogs + } + : row + ) + ); + }; + const handleRunSandbox = async (id) => { updateSandboxStatus(id, 'Sending Request'); const res = await chatflowsApi.deploySandbox(id) @@ -242,11 +291,34 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF const oneClickDeployment = async (id, deploymentConfig) => { try { - // Only call the backend API and return the response (including compose_dir) - const response = await chatflowsApi.clickDeployment(id, deploymentConfig) + // Update table data to show loading icon immediately + updateDeploymentStatus(id, 'In Progress', JSON.stringify(deploymentConfig), JSON.stringify(['Deployment initiated...'])); + + // Proactively update database to prevent race condition on page refresh + await chatflowsApi.updateDeploymentStatus(id, { + status: 'In Progress', + message: 'Deployment initiated...', + logs: ['Deployment initiated...'] + }).catch(error => { + console.error('Failed to update deployment status before API call:', error); + }); + + // Set initial deploy status + setDeployStatusForId(id, ['Info', 'Deployment initiated...']); + + // Call the backend API + const response = await chatflowsApi.clickDeployment(id, deploymentConfig); + + if (response.data && !response.data.error) { + // Start WebSocket monitoring immediately after deployment is triggered + startClickDeploymentMonitoring(id, deploymentConfig); + } + return response.data; // Pass compose_dir and other info to the dialog } catch (error) { - // Optionally show error + setDeployStatusForId(id, ['Error', error?.message || 'Failed to start deployment']); + // Update table data to remove loading icon on error + updateDeploymentStatus(id, 'Error', null, JSON.stringify([error?.message || 'Failed to start deployment'])); return { error: error?.message || 'Deployment failed' }; } } @@ -282,6 +354,135 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF setDeployWebSocketsById((prev) => ({ ...prev, [id]: ws })); }; + // Monitor click deployment WebSocket connections + const startClickDeploymentMonitoring = useCallback((id, deploymentConfig) => { + // Close any existing WebSocket for this ID + if (deployWebSocketsById[id]) { + deployWebSocketsById[id].close(); + } + + const wsUrl = `${window.location.origin.replace(/^http/, 'ws')}/studio-backend/ws/monitor-click-deployment`; + const wsInstance = new WebSocket(wsUrl); + + setDeployWebSocketForId(id, wsInstance); + + wsInstance.onopen = () => { + console.log('[WebSocket] Connected for click deployment monitoring', id); + wsInstance.send(JSON.stringify({ + hostname: deploymentConfig.hostname, + username: deploymentConfig.username, + chatflow_id: id + })); + }; + + wsInstance.onmessage = (event) => { + let data; + try { data = JSON.parse(event.data); } catch { return; } + console.log('[WebSocket] Click deployment message:', data); + + if (data.status === 'Success') { + setDeployStatusForId(id, ['Success', data.message]); + // Update table data to remove loading icon + updateDeploymentStatus(id, 'Success', null, JSON.stringify([data.message])); + // Update database with final status + chatflowsApi.updateDeploymentStatus(id, { + status: 'Success', + message: data.message, + logs: [data.message] + }).catch(error => { + console.error('Failed to update deployment status in database:', error); + }); + // Clean up WebSocket on completion + wsInstance.close(); + setDeployWebSocketForId(id, null); + } else if (data.status === 'Error') { + const errorMessage = data.message; + setDeployStatusForId(id, ['Error', errorMessage]); + // Update table data to remove loading icon + updateDeploymentStatus(id, 'Error', null, JSON.stringify([errorMessage])); + // Update database with final status + chatflowsApi.updateDeploymentStatus(id, { + status: 'Error', + message: errorMessage, + logs: [errorMessage] + }).catch(error => { + console.error('Failed to update deployment status in database:', error); + }); + // Clean up WebSocket on error + wsInstance.close(); + setDeployWebSocketForId(id, null); + } else if (data.status === 'In Progress') { + const progressMessage = data.message || 'Deployment in progress...'; + const logs = data.logs || []; + const logText = logs.length > 0 ? logs.join('\n') : progressMessage; + setDeployStatusForId(id, ['Info', logText]); + + // Update database with progress logs + if (logs.length > 0) { + chatflowsApi.updateDeploymentStatus(id, { + status: 'In Progress', + message: progressMessage, + logs: logs + }).catch(error => { + console.error('Failed to update In Progress deployment status:', error); + }); + } + } + }; + + wsInstance.onerror = (error) => { + console.error('[WebSocket] Click deployment error:', error); + setDeployStatusForId(id, ['Error', 'Connection error during deployment monitoring']); + wsInstance.close(); + setDeployWebSocketForId(id, null); + }; + + wsInstance.onclose = (event) => { + console.log(`[WebSocket] Click deployment closed: code=${event.code}, reason='${event.reason}', wasClean=${event.wasClean}`); + setDeployWebSocketForId(id, null); + + // Check deployment status if abnormal closure + if (event.code !== 1000 && event.code !== 1001) { + console.log('[WebSocket] Abnormal closure detected, checking deployment status...'); + setTimeout(async () => { + try { + const response = await chatflowsApi.getSpecificChatflow(id); + if (response.data && response.data.deploymentStatus === 'In Progress') { + setDeployStatusForId(id, ['Error', 'Connection lost during deployment']); + // Update table data to remove loading icon + updateDeploymentStatus(id, 'Error', null, JSON.stringify(['Connection lost during deployment'])); + // Update database with error status + chatflowsApi.updateDeploymentStatus(id, { + status: 'Error', + message: 'Connection lost during deployment', + logs: ['Connection lost during deployment'] + }).catch(error => { + console.error('Failed to update deployment status in database:', error); + }); + } else if (response.data && response.data.deploymentStatus) { + // Deployment completed, update with final status + const finalStatus = response.data.deploymentStatus; + const logs = response.data.deploymentLogs ? + JSON.parse(response.data.deploymentLogs) : + [finalStatus === 'Success' ? 'Deployment completed successfully' : 'Deployment failed']; + const message = logs[0] || (finalStatus === 'Success' ? 'Deployment completed successfully' : 'Deployment failed'); + setDeployStatusForId(id, [finalStatus, message]); + // Update table data to remove loading icon + updateDeploymentStatus(id, finalStatus, null, JSON.stringify(logs)); + } + } catch (error) { + console.error('Failed to check final deployment status:', error); + setDeployStatusForId(id, ['Error', 'Connection lost during deployment']); + // Update table data to remove loading icon + updateDeploymentStatus(id, 'Error', null, JSON.stringify(['Connection lost during deployment'])); + } + }, 1000); + } + }; + + return wsInstance; + }, [deployWebSocketsById]); + // Cleanup deployment WebSockets when component unmounts useEffect(() => { return () => { @@ -298,6 +499,7 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF setSortedData(handleSortData()); }, [data, order, orderBy]); // Run effect when any dependency changes + // const handleRequestSort = (property) => { // const isAsc = orderBy === property && order === 'asc'; // setOrder(isAsc ? 'desc' : 'asc'); @@ -672,7 +874,8 @@ export const FlowListTable = ({ data, images, isLoading, filterFunction, updateF justifyContent='center' alignItems='center' > - {deployWebSocketsById[row.id] && deployWebSocketsById[row.id].readyState === WebSocket.OPEN ? ( + {row.deploymentStatus === 'In Progress' || + (deployWebSocketsById[row.id] && deployWebSocketsById[row.id].readyState === WebSocket.OPEN) ? (