forked from morphik-org/morphik-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstart_server.py
More file actions
318 lines (262 loc) · 11.7 KB
/
start_server.py
File metadata and controls
318 lines (262 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import argparse
import atexit
import logging
import os
import signal
import socket
import subprocess
import sys
import time
import requests
import tomli
import uvicorn
from dotenv import load_dotenv
from core.config import get_settings
from core.logging_config import setup_logging
# Global variable to store the worker process
worker_process = None
def wait_for_redis(host="localhost", port=6379, timeout=20):
"""
Wait for Redis to become available.
Args:
host: Redis host address
port: Redis port number
timeout: Maximum time to wait in seconds
Returns:
True if Redis becomes available within the timeout, False otherwise
"""
logging.info(f"Waiting for Redis to be available at {host}:{port}...")
t0 = time.monotonic()
while time.monotonic() - t0 < timeout:
try:
with socket.create_connection((host, port), timeout=1):
logging.info("Redis is accepting connections.")
return True
except (OSError, socket.error):
logging.debug(f"Redis not available yet, retrying... ({int(time.monotonic() - t0)}s elapsed)")
time.sleep(0.3)
logging.error(f"Redis not reachable after {timeout}s")
return False
def check_and_start_redis():
"""Check if the Redis container is running, start if necessary."""
try:
# Check if container exists and is running
check_running_cmd = ["docker", "ps", "-q", "-f", "name=morphik-redis"]
running_container = subprocess.check_output(check_running_cmd).strip()
if running_container:
logging.info("Redis container (morphik-redis) is already running.")
return
# Check if container exists but is stopped
check_exists_cmd = ["docker", "ps", "-a", "-q", "-f", "name=morphik-redis"]
existing_container = subprocess.check_output(check_exists_cmd).strip()
if existing_container:
logging.info("Starting existing Redis container (morphik-redis)...")
subprocess.run(["docker", "start", "morphik-redis"], check=True, capture_output=True)
logging.info("Redis container started.")
else:
logging.info("Creating and starting Redis container (morphik-redis)...")
subprocess.run(
["docker", "run", "-d", "--name", "morphik-redis", "-p", "6379:6379", "redis"],
check=True,
capture_output=True,
)
logging.info("Redis container created and started.")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to manage Redis container: {e}")
logging.error(f"Stderr: {e.stderr.decode() if e.stderr else 'N/A'}")
sys.exit(1)
except FileNotFoundError:
logging.error("Docker command not found. Please ensure Docker is installed and in PATH.")
sys.exit(1)
def start_arq_worker():
"""Start the ARQ worker as a subprocess."""
global worker_process
try:
logging.info("Starting ARQ worker...")
# Ensure logs directory exists
log_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(log_dir, exist_ok=True)
# Worker log file paths
worker_log_path = os.path.join(log_dir, "worker.log")
# Open log files
worker_log = open(worker_log_path, "a")
# Add timestamp to log
timestamp = subprocess.check_output(["date"]).decode().strip()
worker_log.write(f"\n\n--- Worker started at {timestamp} ---\n\n")
worker_log.flush()
# Use sys.executable to ensure the same Python environment is used
worker_cmd = [sys.executable, "-m", "arq", "core.workers.ingestion_worker.WorkerSettings"]
# Start the worker with output redirected to log files
worker_process = subprocess.Popen(
worker_cmd,
stdout=worker_log,
stderr=worker_log,
env=dict(os.environ, PYTHONUNBUFFERED="1"), # Ensure unbuffered output
)
logging.info(f"ARQ worker started with PID: {worker_process.pid}")
logging.info(f"Worker logs available at: {worker_log_path}")
except Exception as e:
logging.error(f"Failed to start ARQ worker: {e}")
sys.exit(1)
def cleanup_processes():
"""Stop the ARQ worker process on exit."""
global worker_process
if worker_process and worker_process.poll() is None: # Check if process is still running
logging.info(f"Stopping ARQ worker (PID: {worker_process.pid})...")
# Log the worker termination
try:
log_dir = os.path.join(os.getcwd(), "logs")
worker_log_path = os.path.join(log_dir, "worker.log")
with open(worker_log_path, "a") as worker_log:
timestamp = subprocess.check_output(["date"]).decode().strip()
worker_log.write(f"\n\n--- Worker stopping at {timestamp} ---\n\n")
except Exception as e:
logging.warning(f"Could not write worker stop message to log: {e}")
# Send SIGTERM first for graceful shutdown
worker_process.terminate()
try:
# Wait a bit for graceful shutdown
worker_process.wait(timeout=5)
logging.info("ARQ worker stopped gracefully.")
except subprocess.TimeoutExpired:
logging.warning("ARQ worker did not terminate gracefully, sending SIGKILL.")
worker_process.kill() # Force kill if it doesn't stop
logging.info("ARQ worker killed.")
# Close any open file descriptors for the process
if hasattr(worker_process, "stdout") and worker_process.stdout:
worker_process.stdout.close()
if hasattr(worker_process, "stderr") and worker_process.stderr:
worker_process.stderr.close()
# Optional: Add Redis container stop logic here if desired
# try:
# logging.info("Stopping Redis container...")
# subprocess.run(["docker", "stop", "morphik-redis"], check=False, capture_output=True)
# except Exception as e:
# logging.warning(f"Could not stop Redis container: {e}")
# Register the cleanup function to be called on script exit
atexit.register(cleanup_processes)
# Also register for SIGINT (Ctrl+C) and SIGTERM
signal.signal(signal.SIGINT, lambda sig, frame: sys.exit(0))
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit(0))
def check_ollama_running(base_url):
"""Check if Ollama is running and accessible at the given URL."""
try:
api_url = f"{base_url}/api/tags"
response = requests.get(api_url, timeout=2)
return response.status_code == 200
except requests.RequestException:
return False
def get_ollama_usage_info():
"""Check if Ollama is required based on the configuration file and get base URLs."""
try:
with open("morphik.toml", "rb") as f:
config = tomli.load(f)
ollama_configs = []
# Get registered Ollama models first
ollama_models = {}
if "registered_models" in config:
for model_key, model_config in config["registered_models"].items():
model_name = model_config.get("model_name", "")
if "ollama" in model_name:
api_base = model_config.get("api_base")
if api_base:
ollama_models[model_key] = api_base
# Check which components are using Ollama models
components_to_check = ["embedding", "completion", "rules", "graph", "parser.vision"]
for component in components_to_check:
if component == "parser.vision":
# Special handling for parser.vision
if "parser" in config and "vision" in config["parser"]:
model_key = config["parser"]["vision"].get("model")
if model_key in ollama_models:
ollama_configs.append({"component": component, "base_url": ollama_models[model_key]})
else:
# Standard component check
if component in config:
model_key = config[component].get("model")
if model_key in ollama_models:
ollama_configs.append({"component": component, "base_url": ollama_models[model_key]})
# Add contextual chunking model check
if (
"parser" in config
and config["parser"].get("use_contextual_chunking")
and "contextual_chunking_model" in config["parser"]
):
model_key = config["parser"]["contextual_chunking_model"]
if model_key in ollama_models:
ollama_configs.append(
{
"component": "parser.contextual_chunking",
"base_url": ollama_models[model_key],
}
)
return ollama_configs
except Exception as e:
logging.error(f"Error checking Ollama configuration: {e}")
return []
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Start the Morphik server")
parser.add_argument(
"--log",
choices=["debug", "info", "warning", "error"],
default="info",
help="Set the logging level",
)
parser.add_argument(
"--skip-ollama-check",
action="store_true",
help="Skip Ollama availability check",
)
args = parser.parse_args()
# Set up logging first with specified level
setup_logging(log_level=args.log.upper())
# Check and start Redis container
check_and_start_redis()
# Load environment variables from .env file
load_dotenv()
# Check if Ollama is required and running
if not args.skip_ollama_check:
ollama_configs = get_ollama_usage_info()
if ollama_configs:
# Group configs by base_url to avoid redundant checks
base_urls = {}
for config in ollama_configs:
if config["base_url"] not in base_urls:
base_urls[config["base_url"]] = []
base_urls[config["base_url"]].append(config["component"])
all_running = True
for base_url, components in base_urls.items():
if not check_ollama_running(base_url):
print(f"ERROR: Ollama is not accessible at {base_url}")
print(f"This URL is used by these components: {', '.join(components)}")
all_running = False
if not all_running:
print("\nPlease ensure Ollama is running at the configured URLs before starting the server")
print("Run with --skip-ollama-check to bypass this check")
sys.exit(1)
else:
component_list = [config["component"] for config in ollama_configs]
print(f"Ollama is running and will be used for: {', '.join(component_list)}")
# Load settings (this will validate all required env vars)
settings = get_settings()
# Wait for Redis to be available (using environment variables or defaults)
redis_host = os.environ.get("REDIS_HOST", "127.0.0.1")
redis_port = int(os.environ.get("REDIS_PORT", "6379"))
if not wait_for_redis(host=redis_host, port=redis_port):
logging.error("Cannot start server without Redis. Please ensure Redis is running.")
sys.exit(1)
# Start ARQ worker in the background
start_arq_worker()
# Start server (this is blocking)
logging.info("Starting Uvicorn server...")
uvicorn.run(
"core.api:app",
host=settings.HOST,
port=settings.PORT,
loop="asyncio",
log_level=args.log,
# reload=settings.RELOAD # Reload might interfere with subprocess management
)
if __name__ == "__main__":
main()