-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
179 lines (145 loc) · 5.58 KB
/
app.py
File metadata and controls
179 lines (145 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from flask import Flask, jsonify, Response, send_from_directory
from flask_pymongo import PyMongo
import threading
import os
import subprocess
import logging
from typing import Optional
#===================== Initialize Flask App =====================
app = Flask(__name__)
# Register blueprints for modular routing
from routes.front import front
from routes.filters import filters
from routes.api_mongo import api_mongo
app.register_blueprint(front)
app.register_blueprint(filters)
app.register_blueprint(api_mongo)
#===================== Database Configuration =====================
app.config["MONGO_URI"] = "mongodb://mongo:27017/April"
mongo = PyMongo(app)
app.mongo = mongo
#===================== Pipeline config =====================
pipeline_process: Optional[subprocess.Popen] = None
nlp_process: Optional[subprocess.Popen] = None
def log_pipeline_output(process: subprocess.Popen) -> None:
"""
Continuously reads the stdout and stderr of the given process and logs the output to 'pipeline.log'.
Args:
process (subprocess.Popen): The process whose output is being logged.
"""
with open('pipeline.log', 'a') as log_file:
try:
while True:
line = process.stdout.readline()
if not line:
break
log_file.write(line) # Log the output
log_file.flush()
while True:
error_line = process.stderr.readline()
if not error_line:
break
log_file.write(f"ERROR: {error_line}")
log_file.flush()
except Exception as e:
log_file.write(f"Exception occurred in log_pipeline_output: {e}\n")
log_file.flush()
#===================== Routes =====================
@app.route('/start-pipeline', methods=['POST'])
def start_pipeline() -> str:
"""
Starts the pipeline process if it is not already running.
Returns:
str: Message indicating the pipeline's status or any errors.
"""
global pipeline_process
if pipeline_process and pipeline_process.poll() is None:
return "Pipeline is already running.", 400
try:
pipeline_process = subprocess.Popen(
['python3', 'functions/launch_pipeline1.py'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True
)
log_thread = threading.Thread(target=log_pipeline_output, args=(pipeline_process,), daemon=True)
log_thread.start()
return "Pipeline started."
except Exception as e:
with open('pipeline.log', 'a') as log_file:
log_file.write(f"Failed to start pipeline: {str(e)}\n")
return f"Failed to start pipeline: {str(e)}", 500
@app.route('/data/<filename>', methods=['GET'])
def serve_csv(filename: str) -> Response:
"""
Serves the requested CSV file from the 'data' directory.
Args:
filename (str): The name of the file to be served.
Returns:
Response: The file to be served for download.
"""
return send_from_directory(os.path.join(app.root_path, 'data'), filename)
@app.route('/stop-pipeline', methods=['POST'])
def stop_pipeline() -> str:
"""
Stops the running pipeline process if it is running.
Returns:
str: Message indicating the pipeline's status or any errors.
"""
global pipeline_process
if not pipeline_process or pipeline_process.poll() is not None:
return "Pipeline is not running.", 400
try:
with open('pipeline.log', 'a') as log_file:
log_file.write("Pipeline is being stopped by user.\n")
pipeline_process.terminate()
pipeline_process = None
with open('pipeline.log', 'a') as log_file:
log_file.write("Pipeline has been successfully stopped.\n")
return "Pipeline stopped."
except Exception as e:
with open('pipeline.log', 'a') as log_file:
log_file.write(f"Failed to stop pipeline: {e}\n")
return f"Failed to stop pipeline: {e}", 500
@app.route('/run-nlp', methods=['POST'])
def run_nlp() -> str:
"""
Starts the NLP processing if it is not already running.
Returns:
str: Message indicating the NLP processing status or any errors.
"""
global nlp_process
if nlp_process and nlp_process.poll() is None:
return "NLP is already running.", 400
try:
nlp_process = subprocess.Popen(
['python3', 'functions/launch_pipeline2.py'], # Ensure this runs the NLP processing script
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True
)
log_thread = threading.Thread(target=log_pipeline_output, args=(nlp_process,), daemon=True)
log_thread.start()
return "NLP processing started."
except Exception as e:
logging.error(f"Failed to start NLP processing: {str(e)}")
return f"Failed to start NLP processing: {str(e)}", 500
@app.route('/get-logs', methods=['GET'])
def get_logs() -> jsonify:
"""
Returns the last 30 lines of the pipeline log.
Returns:
jsonify: JSON response containing the last 30 log entries.
"""
log_file = 'pipeline.log'
if os.path.exists(log_file):
with open(log_file, 'r') as f:
logs = f.readlines()
return jsonify({'logs': logs[-30:]})
else:
return jsonify({'logs': []})
#===================== Run the Flask Application =====================
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)