Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 74 additions & 118 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from proton_driver import client
from sqlite_pipeline_manager import SQLitePipelineManager
from mutable_stream_pipeline_manager import MutableStreamPipelineManager

# Configure logging
logging.basicConfig(
Expand All @@ -39,6 +41,12 @@
timeplus_password = os.getenv("TIMEPLUS_PASSWORD") or "timeplus@t+"
timeplus_port = int(os.getenv("TIMEPLUS_PORT", "8463"))

# Configuration for metadata storage
# Options: 'sqlite' (default) or 'mutable_stream'
# Will ignore if it is mutable stream
METADATA_STORAGE = os.getenv("METADATA_STORAGE", "sqlite").lower()
SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH", "pipelines.db")

def wait_for_timeplus_connection(max_retries=30, retry_delay=2):
"""
Wait for Timeplus server to be available by pinging it with 'SELECT 1'
Expand Down Expand Up @@ -301,49 +309,56 @@ def generate_pipeline(self):

class PipelineManager:
def __init__(self):
db_logger.info("Initializing PipelineManager")
db_logger.info(f"Connecting to database: {timeplus_user}@{timeplus_host}:{timeplus_port}")
db_logger.info(f"Initializing PipelineManager with metadata storage: {METADATA_STORAGE}")
db_logger.info(f"Connecting to Timeplus: {timeplus_user}@{timeplus_host}:{timeplus_port}")

# Initialize Timeplus client for stream operations
try:
self.client = client.Client(
host=timeplus_host,
user=timeplus_user,
password=timeplus_password,
port=timeplus_port,
)
db_logger.info("Database connection established")
db_logger.info("Timeplus connection established")
except Exception as e:
db_logger.error(f"Failed to connect to database: {e}")
db_logger.error(f"Failed to connect to Timeplus: {e}")
raise
self.pipeline_stream_name = "synthetic_data_pipelines"

# Initialize metadata storage
self._init_pipeline_metadata()

def _init_pipeline_metadata(self):
db_logger.info(f"Initializing pipeline metadata stream: {self.pipeline_stream_name}")
"""Initialize metadata storage based on configuration"""
# Initialize metadata storage based on configuration
self.use_sqlite = METADATA_STORAGE == "sqlite"

try:
create_sql = f"""CREATE MUTABLE STREAM IF NOT EXISTS {self.pipeline_stream_name} (
id string,
name string,
pipeline string
)
PRIMARY KEY (id)
"""

db_logger.debug(f"Executing DDL: {create_sql}")
self.client.execute(create_sql)
db_logger.info("Pipeline metadata stream initialized successfully")
if self.use_sqlite:
# Initialize SQLite manager for metadata
try:
self.metadata_manager = SQLitePipelineManager(SQLITE_DB_PATH)
db_logger.info(f"SQLite metadata manager initialized with database: {SQLITE_DB_PATH}")
except Exception as e:
db_logger.error(f"Failed to initialize SQLite manager: {e}")
raise
else:
# Initialize mutable stream manager for metadata
try:
self.metadata_manager = MutableStreamPipelineManager(
timeplus_client=self.client,
pipeline_stream_name="synthetic_data_pipelines"
)
db_logger.info("MutableStreamPipelineManager initialized successfully")
except Exception as e:
db_logger.error(f"Failed to initialize MutableStreamPipelineManager: {e}")
raise

except Exception as e:
db_logger.error(f"Failed to initialize pipeline metadata stream: {e}")
raise

def create(self, pipeline, name):
id = uuid.uuid4().hex
db_logger.info(f"Creating pipeline with ID: {id}, name: {name}")
pipeline_id = uuid.uuid4().hex
db_logger.info(f"Creating pipeline with ID: {pipeline_id}, name: {name}")

# Create the pipeline components
# Create the pipeline components in Timeplus
db_logger.info("Creating pipeline components in timeplus...")
try:
# Create random stream
Expand All @@ -352,7 +367,7 @@ def create(self, pipeline, name):
self.client.execute(random_stream_ddl)
db_logger.info(f"Created random stream: {pipeline['random_stream']['name']}")
except Exception as e:
db_logger.error(f"Error creating pipeline components: {e}")
db_logger.error(f"Error creating random stream: {e}")
db_logger.error(f"Failed DDL might be: {pipeline.get('random_stream', {}).get('ddl', 'N/A')}")
raise RuntimeError(f"Failed to create pipeline: {e}")

Expand Down Expand Up @@ -380,30 +395,21 @@ def create(self, pipeline, name):
db_logger.info(f"Created materialized view: {pipeline['write_to_kafka_mv']['name']}")

except Exception as e:
db_logger.error(f"Error creating pipeline components: {e}")
db_logger.error(f"Failed DDL might be: {pipeline.get('random_stream', {}).get('ddl', 'N/A')}")
db_logger.error(f"Error creating materialized view: {e}")
db_logger.error(f"Failed DDL might be: {pipeline.get('write_to_kafka_mv', {}).get('ddl', 'N/A')}")
raise RuntimeError(f"Failed to create pipeline: {e}")

# Save metadata to the pipeline stream
# Save metadata using the configured backend
try:
db_logger.info("Saving pipeline metadata...")
pipeline_json = json.dumps(pipeline, indent=2)

insert_sql = f"INSERT INTO {self.pipeline_stream_name} (id, name, pipeline) VALUES"
values = [[id, name, pipeline_json]]

db_logger.debug(f"Executing insert: {insert_sql}")
db_logger.debug(f"Values: id={id}, name={name}, pipeline_length={len(pipeline_json)}")

self.client.execute(insert_sql, values)
db_logger.info("Pipeline metadata saved successfully")

db_logger.info(f"Saving pipeline metadata using {METADATA_STORAGE}...")
saved_id = self.metadata_manager.create(pipeline, name)
db_logger.info(f"Pipeline metadata saved successfully using {METADATA_STORAGE}")
except Exception as e:
db_logger.error(f"Failed to save pipeline metadata: {e}")
raise RuntimeError(f"Failed to save pipeline metadata: {e}")

db_logger.info(f"Pipeline creation completed successfully with ID: {id}")
return id
db_logger.info(f"Pipeline creation completed successfully with ID: {saved_id}")
return saved_id

def _get_pipeline_write_count(self, pipeline_json):
query_sql = f"SELECT COUNT(*) FROM table({pipeline_json['write_to_kafka_mv']['name']}) WHERE _tp_time > earliest_ts()"
Expand All @@ -421,43 +427,21 @@ def _get_pipeline_write_count(self, pipeline_json):
db_logger.error(f"Failed to get write count: {e}")
return 0

def get(self, id):
db_logger.info(f"Retrieving pipeline with ID: {id}")
def get(self, pipeline_id):
db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}")

try:
query_sql = f"SELECT name, pipeline FROM table({self.pipeline_stream_name}) WHERE id = '{id}'"
db_logger.debug(f"Executing query: {query_sql}")
# Get from metadata manager (works with both SQLite and mutable stream)
pipeline_info = self.metadata_manager.get(pipeline_id)

result = self.client.execute(query_sql)
db_logger.debug(f"Query result: {len(result) if result else 0} rows")
# Get live write count from Timeplus
live_count = self._get_pipeline_write_count(pipeline_info['pipeline'])

if result:
name = result[0][0]
pipeline_json = result[0][1]

db_logger.debug(f"Found pipeline: name={name}, json_length={len(pipeline_json)}")

# Get Pipeline Stats
count = self._get_pipeline_write_count(json.loads(pipeline_json))
db_logger.info(f"Pipeline {name} has {count} writes")

try:
pipeline_data = json.loads(pipeline_json)
db_logger.info(f"Successfully retrieved pipeline: {name}")

return {
"id": id,
"name": name,
"pipeline": pipeline_data,
"write_count": count
}
except json.JSONDecodeError as e:
db_logger.error(f"Failed to parse pipeline JSON: {e}")
db_logger.debug(f"Malformed JSON: {pipeline_json}")
raise RuntimeError(f"Failed to parse pipeline data: {e}")
else:
db_logger.warning(f"Pipeline with id {id} not found")
raise ValueError(f"Pipeline with id {id} not found.")
# Update the write count
pipeline_info['write_count'] = live_count

db_logger.info(f"Successfully retrieved pipeline: {pipeline_info['name']} (writes: {live_count})")
return pipeline_info

except Exception as e:
if isinstance(e, ValueError):
Expand All @@ -469,45 +453,21 @@ def list_all(self):
db_logger.info("Listing all pipelines")

try:
query_sql = f"SELECT id, name, pipeline FROM table({self.pipeline_stream_name})"
db_logger.debug(f"Executing query: {query_sql}")

result = self.client.execute(query_sql)
db_logger.info(f"Found {len(result) if result else 0} pipelines")

pipelines = []
for i, row in enumerate(result):
try:
id, name, pipeline_json = row
pipeline_data = json.loads(pipeline_json)

pipeline_info = {
"id": id,
"name": name,
"question": pipeline_data.get("question", ""),
"created_at": pipeline_data.get("created_at", "")
}
pipelines.append(pipeline_info)

db_logger.debug(f"Pipeline {i}: {id} - {name}")

except Exception as e:
db_logger.error(f"Failed to parse pipeline {i}: {e}")
continue

db_logger.info(f"Successfully processed {len(pipelines)} pipelines")
# List from metadata manager (works with both SQLite and mutable stream)
pipelines = self.metadata_manager.list_all()
db_logger.info(f"Successfully listed {len(pipelines)} pipelines using {METADATA_STORAGE}")
return pipelines

except Exception as e:
db_logger.error(f"Failed to list pipelines: {e}")
raise RuntimeError(f"Failed to list pipelines: {e}")

def delete(self, id):
db_logger.info(f"Deleting pipeline with ID: {id}")
def delete(self, pipeline_id):
db_logger.info(f"Deleting pipeline with ID: {pipeline_id}")

# Get pipeline info first
# Get pipeline info first (this will work with both backends)
try:
pipeline_info = self.get(id)
pipeline_info = self.get(pipeline_id)
pipeline = pipeline_info["pipeline"]
name = pipeline_info["name"]

Expand All @@ -517,9 +477,9 @@ def delete(self, id):
db_logger.error(f"Failed to get pipeline info for deletion: {e}")
raise

# Delete the pipeline resources
# Delete the pipeline resources from Timeplus
try:
db_logger.info("Deleting pipeline components...")
db_logger.info("Deleting pipeline components from Timeplus...")

# Delete materialized view
mv_name = pipeline['write_to_kafka_mv']['name']
Expand All @@ -544,15 +504,11 @@ def delete(self, id):

# TODO: delete Kafka topic

# Delete pipeline metadata
# Delete pipeline metadata using the configured backend
try:
db_logger.info("Deleting pipeline metadata...")
delete_sql = f"DELETE FROM {self.pipeline_stream_name} WHERE id = '{id}'"
db_logger.debug(f"Executing: {delete_sql}")

self.client.execute(delete_sql)
db_logger.info(f"Pipeline {name} deleted successfully")

db_logger.info(f"Deleting pipeline metadata using {METADATA_STORAGE}...")
self.metadata_manager.delete(pipeline_id)
db_logger.info(f"Pipeline {name} deleted successfully using {METADATA_STORAGE}")
except Exception as e:
db_logger.error(f"Failed to delete pipeline metadata: {e}")
raise RuntimeError(f"Failed to delete pipeline metadata: {e}")
Expand Down Expand Up @@ -699,4 +655,4 @@ async def get_manager_page(request: Request):
if __name__ == "__main__":
import uvicorn
app_logger.info("Starting application server on port 5002")
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "5002")))
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "5002")))
Loading