Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

# Add GLOBAL_LOG_LEVEL for Pipeplines
log_level = os.getenv("GLOBAL_LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=LOG_LEVELS[log_level])
logging.basicConfig(level=LOG_LEVELS[log_level], format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


def get_all_pipelines():
Expand Down Expand Up @@ -124,10 +124,10 @@ def install_frontmatter_requirements(requirements):
if requirements:
req_list = [req.strip() for req in requirements.split(",")]
for req in req_list:
print(f"Installing requirement: {req}")
logging.info(f"Installing requirement: {req}")
subprocess.check_call([sys.executable, "-m", "pip", "install", req])
else:
print("No requirements found in frontmatter.")
logging.info("No requirements found in frontmatter.")


async def load_module_from_path(module_name, module_path):
Expand All @@ -153,13 +153,13 @@ async def load_module_from_path(module_name, module_path):
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
print(f"Loaded module: {module.__name__}")
logging.info(f"Loaded module: {module.__name__}")
if hasattr(module, "Pipeline"):
return module.Pipeline()
else:
raise Exception("No Pipeline class found")
except Exception as e:
print(f"Error loading module: {module_name}")
logging.error(f"Error loading module {module_name}: {e}")

# Move the file to the error folder
failed_pipelines_folder = os.path.join(PIPELINES_DIR, "failed")
Expand All @@ -168,7 +168,6 @@ async def load_module_from_path(module_name, module_path):

failed_file_path = os.path.join(failed_pipelines_folder, f"{module_name}.py")
os.rename(module_path, failed_file_path)
print(e)
return None


Expand Down Expand Up @@ -397,8 +396,6 @@ async def add_pipeline(

try:
url = convert_to_raw_url(form_data.url)

print(url)
file_path = await download_file(url, dest_folder=PIPELINES_DIR)
await reload()
return {
Expand Down Expand Up @@ -583,7 +580,7 @@ async def update_valves(pipeline_id: str, form_data: dict):
if hasattr(pipeline, "on_valves_updated"):
await pipeline.on_valves_updated()
except Exception as e:
print(e)
logging.error(f"Error update valves: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{str(e)}",
Expand Down Expand Up @@ -617,7 +614,7 @@ async def filter_inlet(pipeline_id: str, form_data: FilterForm):
else:
return form_data.body
except Exception as e:
print(e)
logging.error(f"Error filtering inlet: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{str(e)}",
Expand Down Expand Up @@ -649,7 +646,7 @@ async def filter_outlet(pipeline_id: str, form_data: FilterForm):
else:
return form_data.body
except Exception as e:
print(e)
logging.error(f"Error filtering outlet: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{str(e)}",
Expand All @@ -672,13 +669,9 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
)

def job():
print(form_data.model)

pipeline = app.state.PIPELINES[form_data.model]
pipeline_id = form_data.model

print(pipeline_id)

if pipeline["type"] == "manifold":
manifold_id, pipeline_id = pipeline_id.split(".", 1)
pipe = PIPELINE_MODULES[manifold_id].pipe
Expand Down