diff --git a/main.py b/main.py index 83659b2..a74817d 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,8 @@ from agno.agent import Agent from agno.models.openai import OpenAIChat from proton_driver import client +from sqlite_pipeline_manager import SQLitePipelineManager +from mutable_stream_pipeline_manager import MutableStreamPipelineManager # Configure logging logging.basicConfig( @@ -39,6 +41,12 @@ timeplus_password = os.getenv("TIMEPLUS_PASSWORD") or "timeplus@t+" timeplus_port = int(os.getenv("TIMEPLUS_PORT", "8463")) +# Configuration for metadata storage +# Options: 'sqlite' (default) or 'mutable_stream' +# Will ignore if it is mutable stream +METADATA_STORAGE = os.getenv("METADATA_STORAGE", "sqlite").lower() +SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH", "pipelines.db") + def wait_for_timeplus_connection(max_retries=30, retry_delay=2): """ Wait for Timeplus server to be available by pinging it with 'SELECT 1' @@ -301,9 +309,10 @@ def generate_pipeline(self): class PipelineManager: def __init__(self): - db_logger.info("Initializing PipelineManager") - db_logger.info(f"Connecting to database: {timeplus_user}@{timeplus_host}:{timeplus_port}") + db_logger.info(f"Initializing PipelineManager with metadata storage: {METADATA_STORAGE}") + db_logger.info(f"Connecting to Timeplus: {timeplus_user}@{timeplus_host}:{timeplus_port}") + # Initialize Timeplus client for stream operations try: self.client = client.Client( host=timeplus_host, @@ -311,39 +320,45 @@ def __init__(self): password=timeplus_password, port=timeplus_port, ) - db_logger.info("Database connection established") + db_logger.info("Timeplus connection established") except Exception as e: - db_logger.error(f"Failed to connect to database: {e}") + db_logger.error(f"Failed to connect to Timeplus: {e}") raise - - self.pipeline_stream_name = "synthetic_data_pipelines" + + # Initialize metadata storage self._init_pipeline_metadata() def _init_pipeline_metadata(self): - db_logger.info(f"Initializing pipeline metadata stream: {self.pipeline_stream_name}") + """Initialize metadata storage based on configuration""" + # Initialize metadata storage based on configuration + self.use_sqlite = METADATA_STORAGE == "sqlite" - try: - create_sql = f"""CREATE MUTABLE STREAM IF NOT EXISTS {self.pipeline_stream_name} ( - id string, - name string, - pipeline string - ) - PRIMARY KEY (id) - """ - - db_logger.debug(f"Executing DDL: {create_sql}") - self.client.execute(create_sql) - db_logger.info("Pipeline metadata stream initialized successfully") + if self.use_sqlite: + # Initialize SQLite manager for metadata + try: + self.metadata_manager = SQLitePipelineManager(SQLITE_DB_PATH) + db_logger.info(f"SQLite metadata manager initialized with database: {SQLITE_DB_PATH}") + except Exception as e: + db_logger.error(f"Failed to initialize SQLite manager: {e}") + raise + else: + # Initialize mutable stream manager for metadata + try: + self.metadata_manager = MutableStreamPipelineManager( + timeplus_client=self.client, + pipeline_stream_name="synthetic_data_pipelines" + ) + db_logger.info("MutableStreamPipelineManager initialized successfully") + except Exception as e: + db_logger.error(f"Failed to initialize MutableStreamPipelineManager: {e}") + raise - except Exception as e: - db_logger.error(f"Failed to initialize pipeline metadata stream: {e}") - raise def create(self, pipeline, name): - id = uuid.uuid4().hex - db_logger.info(f"Creating pipeline with ID: {id}, name: {name}") + pipeline_id = uuid.uuid4().hex + db_logger.info(f"Creating pipeline with ID: {pipeline_id}, name: {name}") - # Create the pipeline components + # Create the pipeline components in Timeplus db_logger.info("Creating pipeline components in timeplus...") try: # Create random stream @@ -352,7 +367,7 @@ def create(self, pipeline, name): self.client.execute(random_stream_ddl) db_logger.info(f"Created random stream: {pipeline['random_stream']['name']}") except Exception as e: - db_logger.error(f"Error creating pipeline components: {e}") + db_logger.error(f"Error creating random stream: {e}") db_logger.error(f"Failed DDL might be: {pipeline.get('random_stream', {}).get('ddl', 'N/A')}") raise RuntimeError(f"Failed to create pipeline: {e}") @@ -380,30 +395,21 @@ def create(self, pipeline, name): db_logger.info(f"Created materialized view: {pipeline['write_to_kafka_mv']['name']}") except Exception as e: - db_logger.error(f"Error creating pipeline components: {e}") - db_logger.error(f"Failed DDL might be: {pipeline.get('random_stream', {}).get('ddl', 'N/A')}") + db_logger.error(f"Error creating materialized view: {e}") + db_logger.error(f"Failed DDL might be: {pipeline.get('write_to_kafka_mv', {}).get('ddl', 'N/A')}") raise RuntimeError(f"Failed to create pipeline: {e}") - # Save metadata to the pipeline stream + # Save metadata using the configured backend try: - db_logger.info("Saving pipeline metadata...") - pipeline_json = json.dumps(pipeline, indent=2) - - insert_sql = f"INSERT INTO {self.pipeline_stream_name} (id, name, pipeline) VALUES" - values = [[id, name, pipeline_json]] - - db_logger.debug(f"Executing insert: {insert_sql}") - db_logger.debug(f"Values: id={id}, name={name}, pipeline_length={len(pipeline_json)}") - - self.client.execute(insert_sql, values) - db_logger.info("Pipeline metadata saved successfully") - + db_logger.info(f"Saving pipeline metadata using {METADATA_STORAGE}...") + saved_id = self.metadata_manager.create(pipeline, name) + db_logger.info(f"Pipeline metadata saved successfully using {METADATA_STORAGE}") except Exception as e: db_logger.error(f"Failed to save pipeline metadata: {e}") raise RuntimeError(f"Failed to save pipeline metadata: {e}") - db_logger.info(f"Pipeline creation completed successfully with ID: {id}") - return id + db_logger.info(f"Pipeline creation completed successfully with ID: {saved_id}") + return saved_id def _get_pipeline_write_count(self, pipeline_json): query_sql = f"SELECT COUNT(*) FROM table({pipeline_json['write_to_kafka_mv']['name']}) WHERE _tp_time > earliest_ts()" @@ -421,43 +427,21 @@ def _get_pipeline_write_count(self, pipeline_json): db_logger.error(f"Failed to get write count: {e}") return 0 - def get(self, id): - db_logger.info(f"Retrieving pipeline with ID: {id}") + def get(self, pipeline_id): + db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}") try: - query_sql = f"SELECT name, pipeline FROM table({self.pipeline_stream_name}) WHERE id = '{id}'" - db_logger.debug(f"Executing query: {query_sql}") + # Get from metadata manager (works with both SQLite and mutable stream) + pipeline_info = self.metadata_manager.get(pipeline_id) - result = self.client.execute(query_sql) - db_logger.debug(f"Query result: {len(result) if result else 0} rows") + # Get live write count from Timeplus + live_count = self._get_pipeline_write_count(pipeline_info['pipeline']) - if result: - name = result[0][0] - pipeline_json = result[0][1] - - db_logger.debug(f"Found pipeline: name={name}, json_length={len(pipeline_json)}") - - # Get Pipeline Stats - count = self._get_pipeline_write_count(json.loads(pipeline_json)) - db_logger.info(f"Pipeline {name} has {count} writes") - - try: - pipeline_data = json.loads(pipeline_json) - db_logger.info(f"Successfully retrieved pipeline: {name}") - - return { - "id": id, - "name": name, - "pipeline": pipeline_data, - "write_count": count - } - except json.JSONDecodeError as e: - db_logger.error(f"Failed to parse pipeline JSON: {e}") - db_logger.debug(f"Malformed JSON: {pipeline_json}") - raise RuntimeError(f"Failed to parse pipeline data: {e}") - else: - db_logger.warning(f"Pipeline with id {id} not found") - raise ValueError(f"Pipeline with id {id} not found.") + # Update the write count + pipeline_info['write_count'] = live_count + + db_logger.info(f"Successfully retrieved pipeline: {pipeline_info['name']} (writes: {live_count})") + return pipeline_info except Exception as e: if isinstance(e, ValueError): @@ -469,45 +453,21 @@ def list_all(self): db_logger.info("Listing all pipelines") try: - query_sql = f"SELECT id, name, pipeline FROM table({self.pipeline_stream_name})" - db_logger.debug(f"Executing query: {query_sql}") - - result = self.client.execute(query_sql) - db_logger.info(f"Found {len(result) if result else 0} pipelines") - - pipelines = [] - for i, row in enumerate(result): - try: - id, name, pipeline_json = row - pipeline_data = json.loads(pipeline_json) - - pipeline_info = { - "id": id, - "name": name, - "question": pipeline_data.get("question", ""), - "created_at": pipeline_data.get("created_at", "") - } - pipelines.append(pipeline_info) - - db_logger.debug(f"Pipeline {i}: {id} - {name}") - - except Exception as e: - db_logger.error(f"Failed to parse pipeline {i}: {e}") - continue - - db_logger.info(f"Successfully processed {len(pipelines)} pipelines") + # List from metadata manager (works with both SQLite and mutable stream) + pipelines = self.metadata_manager.list_all() + db_logger.info(f"Successfully listed {len(pipelines)} pipelines using {METADATA_STORAGE}") return pipelines except Exception as e: db_logger.error(f"Failed to list pipelines: {e}") raise RuntimeError(f"Failed to list pipelines: {e}") - def delete(self, id): - db_logger.info(f"Deleting pipeline with ID: {id}") + def delete(self, pipeline_id): + db_logger.info(f"Deleting pipeline with ID: {pipeline_id}") - # Get pipeline info first + # Get pipeline info first (this will work with both backends) try: - pipeline_info = self.get(id) + pipeline_info = self.get(pipeline_id) pipeline = pipeline_info["pipeline"] name = pipeline_info["name"] @@ -517,9 +477,9 @@ def delete(self, id): db_logger.error(f"Failed to get pipeline info for deletion: {e}") raise - # Delete the pipeline resources + # Delete the pipeline resources from Timeplus try: - db_logger.info("Deleting pipeline components...") + db_logger.info("Deleting pipeline components from Timeplus...") # Delete materialized view mv_name = pipeline['write_to_kafka_mv']['name'] @@ -544,15 +504,11 @@ def delete(self, id): # TODO: delete Kafka topic - # Delete pipeline metadata + # Delete pipeline metadata using the configured backend try: - db_logger.info("Deleting pipeline metadata...") - delete_sql = f"DELETE FROM {self.pipeline_stream_name} WHERE id = '{id}'" - db_logger.debug(f"Executing: {delete_sql}") - - self.client.execute(delete_sql) - db_logger.info(f"Pipeline {name} deleted successfully") - + db_logger.info(f"Deleting pipeline metadata using {METADATA_STORAGE}...") + self.metadata_manager.delete(pipeline_id) + db_logger.info(f"Pipeline {name} deleted successfully using {METADATA_STORAGE}") except Exception as e: db_logger.error(f"Failed to delete pipeline metadata: {e}") raise RuntimeError(f"Failed to delete pipeline metadata: {e}") @@ -699,4 +655,4 @@ async def get_manager_page(request: Request): if __name__ == "__main__": import uvicorn app_logger.info("Starting application server on port 5002") - uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "5002"))) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "5002"))) diff --git a/mutable_stream_pipeline_manager.py b/mutable_stream_pipeline_manager.py new file mode 100644 index 0000000..fd4a296 --- /dev/null +++ b/mutable_stream_pipeline_manager.py @@ -0,0 +1,164 @@ +import json +import uuid +import logging +from proton_driver import client + +db_logger = logging.getLogger("database") + +class MutableStreamPipelineManager: + def __init__(self, timeplus_client, pipeline_stream_name="synthetic_data_pipelines"): + """Initialize mutable stream-based pipeline manager""" + db_logger.info("Initializing MutableStreamPipelineManager") + self.client = timeplus_client + self.pipeline_stream_name = pipeline_stream_name + self._init_database() + + def _init_database(self): + """Initialize the mutable stream for storing pipeline metadata""" + db_logger.info(f"Initializing pipeline metadata stream: {self.pipeline_stream_name}") + + try: + create_sql = f"""CREATE MUTABLE STREAM IF NOT EXISTS {self.pipeline_stream_name} ( + id string, + name string, + pipeline string + ) + PRIMARY KEY (id) + """ + + db_logger.debug(f"Executing DDL: {create_sql}") + self.client.execute(create_sql) + db_logger.info("Pipeline metadata stream initialized successfully") + + except Exception as e: + db_logger.error(f"Failed to initialize pipeline metadata stream: {e}") + raise RuntimeError(f"Failed to initialize pipeline metadata stream: {e}") + + def create(self, pipeline, name): + """Create a new pipeline entry in mutable stream""" + pipeline_id = uuid.uuid4().hex + db_logger.info(f"Creating pipeline with ID: {pipeline_id}, name: {name}") + + try: + # Convert pipeline to JSON string + pipeline_json = json.dumps(pipeline, indent=2) + + # Insert new pipeline + insert_sql = f"INSERT INTO {self.pipeline_stream_name} (id, name, pipeline) VALUES" + values = [[pipeline_id, name, pipeline_json]] + + db_logger.debug(f"Executing insert: {insert_sql}") + db_logger.debug(f"Values: id={pipeline_id}, name={name}, pipeline_length={len(pipeline_json)}") + + self.client.execute(insert_sql, values) + db_logger.info(f"Pipeline saved successfully with ID: {pipeline_id}") + return pipeline_id + + except Exception as e: + db_logger.error(f"Failed to create pipeline: {e}") + raise RuntimeError(f"Failed to create pipeline: {e}") + + def get(self, pipeline_id): + """Get a specific pipeline by ID from mutable stream""" + db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}") + + try: + # Query for the pipeline + query_sql = f"SELECT name, pipeline FROM table({self.pipeline_stream_name}) WHERE id = '{pipeline_id}'" + db_logger.debug(f"Executing query: {query_sql}") + + result = self.client.execute(query_sql) + db_logger.debug(f"Query result: {len(result) if result else 0} rows") + + if result: + name = result[0][0] + pipeline_json = result[0][1] + + db_logger.debug(f"Found pipeline: name={name}, json_length={len(pipeline_json)}") + + try: + pipeline_data = json.loads(pipeline_json) + db_logger.info(f"Successfully retrieved pipeline from mutable stream: {name}") + + return { + "id": pipeline_id, + "name": name, + "pipeline": pipeline_data, + "write_count": 0 # Will be updated by caller if needed + } + except json.JSONDecodeError as e: + db_logger.error(f"Failed to parse pipeline JSON: {e}") + db_logger.debug(f"Malformed JSON: {pipeline_json}") + raise RuntimeError(f"Failed to parse pipeline data: {e}") + else: + db_logger.warning(f"Pipeline with id {pipeline_id} not found") + raise ValueError(f"Pipeline with id {pipeline_id} not found.") + + except ValueError: + # Re-raise ValueError for not found + raise + except Exception as e: + db_logger.error(f"Failed to retrieve pipeline {pipeline_id}: {e}") + raise RuntimeError(f"Failed to get pipeline: {e}") + + def list_all(self): + """List all pipelines from mutable stream""" + db_logger.info("Listing all pipelines") + + try: + # Query for all pipelines + query_sql = f"SELECT id, name, pipeline FROM table({self.pipeline_stream_name})" + db_logger.debug(f"Executing query: {query_sql}") + + result = self.client.execute(query_sql) + db_logger.info(f"Found {len(result) if result else 0} pipelines") + + pipelines = [] + for i, row in enumerate(result): + try: + pipeline_id, name, pipeline_json = row + pipeline_data = json.loads(pipeline_json) + + pipeline_info = { + "id": pipeline_id, + "name": name, + "question": pipeline_data.get("question", ""), + "created_at": pipeline_data.get("created_at", "") + } + pipelines.append(pipeline_info) + + db_logger.debug(f"Pipeline {i}: {pipeline_id} - {name}") + + except Exception as e: + db_logger.error(f"Failed to parse pipeline {i}: {e}") + continue + + db_logger.info(f"Successfully processed {len(pipelines)} pipelines from mutable stream") + return pipelines + + except Exception as e: + db_logger.error(f"Failed to list pipelines: {e}") + raise RuntimeError(f"Failed to list pipelines: {e}") + + def delete(self, pipeline_id): + """Delete a pipeline by ID from mutable stream""" + db_logger.info(f"Deleting pipeline metadata with ID: {pipeline_id}") + + try: + # Delete the pipeline metadata + delete_sql = f"DELETE FROM {self.pipeline_stream_name} WHERE id = '{pipeline_id}'" + db_logger.debug(f"Executing: {delete_sql}") + + self.client.execute(delete_sql) + db_logger.info(f"Pipeline metadata deleted successfully from mutable stream") + + except Exception as e: + db_logger.error(f"Failed to delete pipeline metadata {pipeline_id}: {e}") + raise RuntimeError(f"Failed to delete pipeline metadata: {e}") + + def update_write_count(self, pipeline_id, count): + """Update write count - not implemented for mutable stream (handled externally)""" + # Note: Mutable stream doesn't store write_count directly + # It's calculated on-the-fly by the calling code + db_logger.debug(f"Write count update requested for pipeline {pipeline_id}: {count}") + pass diff --git a/sqlite_pipeline_manager.py b/sqlite_pipeline_manager.py new file mode 100644 index 0000000..c0c46b4 --- /dev/null +++ b/sqlite_pipeline_manager.py @@ -0,0 +1,209 @@ +import sqlite3 +import json +import uuid +import logging +from pathlib import Path + +db_logger = logging.getLogger("database") + +class SQLitePipelineManager: + def __init__(self, db_file="pipelines.db"): + """Initialize SQLite-based pipeline manager""" + db_logger.info("Initializing SQLitePipelineManager") + self.db_file = Path(db_file) + self._init_database() + + def _init_database(self): + """Initialize the SQLite database and create tables if they don't exist""" + db_logger.info(f"Initializing database: {self.db_file}") + + try: + with sqlite3.connect(self.db_file) as conn: + cursor = conn.cursor() + + # Create pipelines table + create_table_sql = """ + CREATE TABLE IF NOT EXISTS pipelines ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + pipeline_json TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + write_count INTEGER DEFAULT 0 + ) + """ + + cursor.execute(create_table_sql) + conn.commit() + db_logger.info("Database initialized successfully") + + except Exception as e: + db_logger.error(f"Failed to initialize database: {e}") + raise RuntimeError(f"Failed to initialize database: {e}") + + def _get_connection(self): + """Get a database connection""" + return sqlite3.connect(self.db_file) + + def create(self, pipeline, name): + """Create a new pipeline entry""" + pipeline_id = uuid.uuid4().hex + db_logger.info(f"Creating pipeline with ID: {pipeline_id}, name: {name}") + + try: + with self._get_connection() as conn: + cursor = conn.cursor() + + # Convert pipeline to JSON string + pipeline_json = json.dumps(pipeline, indent=2) + + # Insert new pipeline + insert_sql = """ + INSERT INTO pipelines (id, name, pipeline_json, write_count) + VALUES (?, ?, ?, ?) + """ + + cursor.execute(insert_sql, (pipeline_id, name, pipeline_json, 0)) + conn.commit() + + db_logger.info(f"Pipeline saved successfully with ID: {pipeline_id}") + return pipeline_id + + except Exception as e: + db_logger.error(f"Failed to create pipeline: {e}") + raise RuntimeError(f"Failed to create pipeline: {e}") + + def get(self, pipeline_id): + """Get a specific pipeline by ID""" + db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}") + + try: + with self._get_connection() as conn: + cursor = conn.cursor() + + # Query for the pipeline + select_sql = """ + SELECT name, pipeline_json, write_count + FROM pipelines + WHERE id = ? + """ + + cursor.execute(select_sql, (pipeline_id,)) + result = cursor.fetchone() + + if not result: + db_logger.warning(f"Pipeline with id {pipeline_id} not found") + raise ValueError(f"Pipeline with id {pipeline_id} not found.") + + name, pipeline_json, write_count = result + + # Parse JSON + pipeline_data = json.loads(pipeline_json) + + db_logger.info(f"Successfully retrieved pipeline: {name}") + + return { + "id": pipeline_id, + "name": name, + "pipeline": pipeline_data, + "write_count": write_count or 0 + } + + except ValueError: + # Re-raise ValueError for not found + raise + except Exception as e: + db_logger.error(f"Failed to retrieve pipeline {pipeline_id}: {e}") + raise RuntimeError(f"Failed to get pipeline: {e}") + + def list_all(self): + """List all pipelines""" + db_logger.info("Listing all pipelines") + + try: + with self._get_connection() as conn: + cursor = conn.cursor() + + # Query for all pipelines + select_sql = """ + SELECT id, name, pipeline_json, created_at + FROM pipelines + ORDER BY created_at DESC + """ + + cursor.execute(select_sql) + results = cursor.fetchall() + + pipelines = [] + for row in results: + try: + pipeline_id, name, pipeline_json, created_at = row + pipeline_data = json.loads(pipeline_json) + + pipeline_info = { + "id": pipeline_id, + "name": name, + "question": pipeline_data.get("question", ""), + "created_at": created_at or "" + } + pipelines.append(pipeline_info) + + except Exception as e: + db_logger.error(f"Failed to parse pipeline {pipeline_id}: {e}") + continue + + db_logger.info(f"Found {len(pipelines)} pipelines") + return pipelines + + except Exception as e: + db_logger.error(f"Failed to list pipelines: {e}") + raise RuntimeError(f"Failed to list pipelines: {e}") + + def delete(self, pipeline_id): + """Delete a pipeline by ID""" + db_logger.info(f"Deleting pipeline with ID: {pipeline_id}") + + try: + # First, get the pipeline name for logging + pipeline_info = self.get(pipeline_id) + pipeline_name = pipeline_info["name"] + + with self._get_connection() as conn: + cursor = conn.cursor() + + # Delete the pipeline + delete_sql = "DELETE FROM pipelines WHERE id = ?" + cursor.execute(delete_sql, (pipeline_id,)) + + if cursor.rowcount == 0: + raise ValueError(f"Pipeline with id {pipeline_id} not found.") + + conn.commit() + db_logger.info(f"Pipeline {pipeline_name} deleted successfully") + + except ValueError: + # Re-raise ValueError for not found + raise + except Exception as e: + db_logger.error(f"Failed to delete pipeline {pipeline_id}: {e}") + raise RuntimeError(f"Failed to delete pipeline: {e}") + + def update_write_count(self, pipeline_id, count): + """Update the write count for a pipeline""" + db_logger.info(f"Updating write count for pipeline {pipeline_id}: {count}") + + try: + with self._get_connection() as conn: + cursor = conn.cursor() + + update_sql = "UPDATE pipelines SET write_count = ? WHERE id = ?" + cursor.execute(update_sql, (count, pipeline_id)) + + if cursor.rowcount == 0: + raise ValueError(f"Pipeline with id {pipeline_id} not found.") + + conn.commit() + db_logger.debug(f"Write count updated successfully") + + except Exception as e: + db_logger.error(f"Failed to update write count: {e}") + raise RuntimeError(f"Failed to update write count: {e}")