|
58 | 58 | else: |
59 | 59 | config = all_profiles.get(profile_name) |
60 | 60 |
|
| 61 | +# Check if the EFS or EVS tools are enabled in the profiles |
| 62 | +if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])): |
| 63 | + _enableEFS = True |
| 64 | +else: |
| 65 | + _enableEFS = False |
| 66 | + |
| 67 | +if any(re.match(pattern, 'evs_*') for pattern in config.get('tool',[])): |
| 68 | + _enableEVS = True |
| 69 | +else: |
| 70 | + _enableEVS = False |
| 71 | + |
61 | 72 | # Initialize module loader with profile configuration |
62 | 73 | module_loader = td.initialize_module_loader(config) |
63 | 74 |
|
64 | | -# Now initialize the TD connection after module loader is ready |
65 | | -_tdconn = td.TDConn() |
66 | | - |
67 | 75 | # Set up logging |
68 | 76 | os.makedirs("logs", exist_ok=True) |
69 | 77 | logging.basicConfig( |
|
78 | 86 | # Connect to MCP server |
79 | 87 | mcp = FastMCP("teradata-mcp-server") |
80 | 88 |
|
| 89 | +# Now initialize the TD connection after module loader is ready |
| 90 | +_tdconn = td.TDConn() |
| 91 | + |
| 92 | +if _enableEFS: |
| 93 | + fs_config = td.FeatureStoreConfig() |
| 94 | + import teradataml as tdml # import of the teradataml package |
| 95 | + try: |
| 96 | + tdml.create_context(tdsqlengine=_tdconn.engine) |
| 97 | + except Exception as e: |
| 98 | + logger.warning(f"Error creating teradataml context: {e}") |
| 99 | + |
| 100 | + |
81 | 101 | #global shutdown flag |
82 | 102 | shutdown_in_progress = False |
83 | 103 |
|
84 | | -# Initiate connection to Teradata (delayed until after module loader is ready) |
85 | | -_tdconn = None |
86 | | - |
87 | 104 | #afm-defect: |
88 | 105 | _enableEVS = False |
89 | 106 | # Only attempt to connect to EVS is the system has an EVS installed/configured |
@@ -138,7 +155,10 @@ def execute_db_tool(tool, *args, **kwargs): |
138 | 155 | if _enableEFS: |
139 | 156 | fs_config = td.FeatureStoreConfig() |
140 | 157 | import teradataml as tdml # import of the teradataml package |
141 | | - tdml.create_context(tdsqlengine=_tdconn.engine) |
| 158 | + try: |
| 159 | + tdml.create_context(tdsqlengine=_tdconn.engine) |
| 160 | + except Exception as e: |
| 161 | + logger.warning(f"Error creating teradataml context: {e}") |
142 | 162 |
|
143 | 163 |
|
144 | 164 | # Check is the first argument of the tool is a SQLAlchemy Connection |
@@ -235,7 +255,7 @@ def register_td_tools(config, module_loader, mcp): |
235 | 255 | # Get all functions from the loaded modules |
236 | 256 | all_functions = module_loader.get_all_functions() |
237 | 257 | for name, func in all_functions.items(): |
238 | | - if not (inspect.isfunction(func) and name.startswith("handle_") and not name.startswith("handle_fs_")): |
| 258 | + if not (inspect.isfunction(func) and name.startswith("handle_")): |
239 | 259 | continue |
240 | 260 |
|
241 | 261 | tool_name = name[len("handle_"):] |
@@ -493,169 +513,27 @@ def get_glossary_term(term_name: str) -> dict: |
493 | 513 | #--------------- Feature Store Tools ---------------# |
494 | 514 | # Feature tools leveraging the tdfs4ds package. |
495 | 515 | # Run only if the EFS tools are defined in the config |
496 | | -if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])): |
497 | | - class FeatureStoreConfig(BaseModel): |
498 | | - """ |
499 | | - Configuration class for the feature store. This model defines the metadata and catalog sources |
500 | | - used to organize and access features, processes, and datasets across data domains. |
501 | | - """ |
502 | | - |
503 | | - data_domain: str | None = Field( |
504 | | - default=None, |
505 | | - description="The data domain associated with the feature store, grouping features within the same namespace." |
506 | | - ) |
507 | | - |
508 | | - entity: str | None = Field( |
509 | | - default=None, |
510 | | - description="The list of entities, comma separated and in alphabetical order, upper case." |
511 | | - ) |
512 | | - |
513 | | - db_name: str | None = Field( |
514 | | - default=None, |
515 | | - description="Name of the database where the feature store is hosted." |
516 | | - ) |
517 | | - |
518 | | - feature_catalog: str | None = Field( |
519 | | - default=None, |
520 | | - description=( |
521 | | - "Name of the feature catalog table. " |
522 | | - "This table contains detailed metadata about features and entities." |
523 | | - ) |
524 | | - ) |
525 | | - |
526 | | - process_catalog: str | None = Field( |
527 | | - default=None, |
528 | | - description=( |
529 | | - "Name of the process catalog table. " |
530 | | - "Used to retrieve information about feature generation processes, features, and associated entities." |
531 | | - ) |
532 | | - ) |
533 | | - |
534 | | - dataset_catalog: str | None = Field( |
535 | | - default=None, |
536 | | - description=( |
537 | | - "Name of the dataset catalog table. " |
538 | | - "Used to list and manage available datasets within the feature store." |
539 | | - ) |
540 | | - ) |
541 | | - |
542 | | - fs_config = FeatureStoreConfig() |
543 | | - |
544 | | - @mcp.tool(description="Reconnect to the Teradata database if the connection is lost.") |
545 | | - async def reconnect_to_database() -> ResponseType: |
546 | | - """Reconnect to Teradata database if connection is lost.""" |
547 | | - global _tdconn |
548 | | - try: |
549 | | - _tdconn = td.TDConn() |
550 | | - td.teradataml_connection() |
551 | | - return format_text_response("Reconnected to Teradata database successfully.") |
552 | | - except Exception as e: |
553 | | - logger.error(f"Error reconnecting to database: {e}") |
554 | | - return format_error_response(str(e)) |
| 516 | +if _enableEFS: |
555 | 517 |
|
556 | 518 | @mcp.tool(description="Set or update the feature store configuration (database and data domain).") |
557 | 519 | async def fs_setFeatureStoreConfig( |
558 | 520 | data_domain: Optional[str] = None, |
559 | 521 | db_name: Optional[str] = None, |
560 | 522 | entity: Optional[str] = None, |
561 | 523 | ) -> td.FeatureStoreConfig: |
| 524 | + global _tdconn |
562 | 525 | with _tdconn.engine.connect() as conn: |
563 | 526 | return fs_config.fs_setFeatureStoreConfig( |
564 | 527 | conn=conn, |
565 | 528 | db_name=db_name, |
566 | 529 | data_domain=data_domain, |
567 | 530 | entity=entity, |
568 | 531 | ) |
569 | | - data_domain: str | None = None, |
570 | | - db_name: str | None = None, |
571 | | - entity: str | None = None, |
572 | | - ) -> ResponseType: |
573 | | - if db_name: |
574 | | - if tdfs4ds.connect(database=db_name): |
575 | | - logger.info(f"connected to the feature store of the {db_name} database") |
576 | | - # Reset data_domain if DB name changes |
577 | | - if not (fs_config.db_name and fs_config.db_name.upper() == db_name.upper()): |
578 | | - fs_config.data_domain = None |
579 | | - |
580 | | - fs_config.db_name = db_name |
581 | | - logger.info(f"connected to the feature store of the {db_name} database") |
582 | | - fs_config.feature_catalog = f"{db_name}.{tdfs4ds.FEATURE_CATALOG_NAME_VIEW}" |
583 | | - logger.info(f"feature catalog {fs_config.feature_catalog}") |
584 | | - fs_config.process_catalog = f"{db_name}.{tdfs4ds.PROCESS_CATALOG_NAME_VIEW}" |
585 | | - logger.info(f"process catalog {fs_config.process_catalog}") |
586 | | - fs_config.dataset_catalog = f"{db_name}.FS_V_FS_DATASET_CATALOG" # <- fixed line |
587 | | - logger.info(f"dataset catalog {fs_config.dataset_catalog}") |
588 | | - |
589 | | - if fs_config.db_name is not None and data_domain is not None: |
590 | | - sql_query_ = f"SEL count(*) AS N FROM {fs_config.feature_catalog} WHERE UPPER(data_domain) = '{data_domain.upper()}'" |
591 | | - logger.info(f"{sql_query_}") |
592 | | - result = tdml.execute_sql(sql_query_) |
593 | | - logger.info(f"{result}") |
594 | | - if result.fetchall()[0][0] > 0: |
595 | | - fs_config.data_domain = data_domain |
596 | | - else: |
597 | | - fs_config.data_domain = None |
598 | | - |
599 | | - if fs_config.db_name is not None and fs_config.data_domain is not None and entity is not None: |
600 | | - sql_query_ = f"SEL count(*) AS N FROM {fs_config.feature_catalog} WHERE UPPER(data_domain) = '{data_domain.upper()}' AND ENTITY_NAME = '{entity.upper()}'" |
601 | | - logger.info(f"{sql_query_}") |
602 | | - result = tdml.execute_sql(sql_query_) |
603 | | - logger.info(f"{result}") |
604 | | - if result.fetchall()[0][0] > 0: |
605 | | - fs_config.entity = entity |
606 | | - return format_text_response(f"Feature store config updated: {fs_config.model_dump(exclude_none=True)}") |
607 | 532 |
|
608 | 533 | @mcp.tool(description="Display the current feature store configuration (database and data domain).") |
609 | 534 | async def fs_getFeatureStoreConfig() -> ResponseType: |
610 | 535 | return format_text_response(f"Current feature store config: {fs_config.model_dump(exclude_none=True)}") |
611 | | - return format_text_response(f"Current feature store config: {fs_config.model_dump(exclude_none=True)}") |
612 | | - |
613 | | - @mcp.tool(description=("Check if a feature store is present in the specified database." )) |
614 | | - async def fs_isFeatureStorePresent( |
615 | | - db_name: str = Field(..., description="Name of the database to check for a feature store.") |
616 | | - ) -> ResponseType: |
617 | | - return execute_db_tool(td.handle_fs_isFeatureStorePresent, db_name=db_name) |
618 | | - |
619 | | - @mcp.tool(description=("Returns a summary of the feature store content. Use this to understand what data is available in the feature store")) |
620 | | - async def fs_featureStoreContent( |
621 | | - ) -> ResponseType: |
622 | | - return execute_db_tool(td.handle_fs_featureStoreContent, fs_config=fs_config) |
623 | | - |
624 | | - @mcp.tool(description=( "List the available data domains. Requires a configured `db_name` in the feature store config. Use this to explore which entities can be used when building a dataset.")) |
625 | | - async def fs_getDataDomains( |
626 | | - ) -> ResponseType: |
627 | | - return execute_db_tool(td.handle_fs_getDataDomains, fs_config=fs_config) |
628 | | - |
629 | | - @mcp.tool(description=("List the list of features. Requires a configured `db_name` and `data_domain` in the feature store config. Use this to explore the features available .")) |
630 | | - async def fs_getFeatures( |
631 | | - ) -> ResponseType: |
632 | | - return execute_db_tool(td.handle_fs_getFeatures, fs_config=fs_config) |
633 | | - |
634 | | - @mcp.tool(description=("List the list of available datasets.Requires a configured `db_name` in the feature store config.Use this to explore the datasets that are available .")) |
635 | | - async def fs_getAvailableDatasets( |
636 | | - ) -> ResponseType: |
637 | | - return execute_db_tool(td.handle_fs_getAvailableDatasets, fs_config=fs_config) |
638 | | - |
639 | | - @mcp.tool(description=("Return the schema of the feature store.Requires a feature store in the configured database (`db_name`).")) |
640 | | - async def fs_getFeatureDataModel( |
641 | | - ) -> ResponseType: |
642 | | - return execute_db_tool(td.handle_fs_getFeatureDataModel, fs_config=fs_config) |
643 | | - |
644 | | - |
645 | | - @mcp.tool(description=("List the available entities for a given data domain. Requires a configured `db_name` and `data_domain` and `entity` in the feature store config. Use this to explore which entities can be used when building a dataset.")) |
646 | | - async def fs_getAvailableEntities( |
647 | | - ) -> ResponseType: |
648 | | - return execute_db_tool(td.handle_fs_getAvailableEntities, fs_config=fs_config) |
649 | | - |
650 | | - @mcp.tool( description=("Create a dataset using selected features and an entity from the feature store. The dataset is created in the specified target database under the given name. Requires a configured feature store and data domain. Registers the dataset in the catalog automatically. Use this when you want to build and register a new dataset for analysis or modeling." ) ) |
651 | | - async def fs_createDataset( |
652 | | - entity_name: str = Field(..., description="Entity for which the dataset will be created. Available entities are reported in the feature catalog."), |
653 | | - feature_selection: list[str] = Field(..., description="List of features to include in the dataset. Available features are reported in the feature catalog."), |
654 | | - dataset_name: str = Field(..., description="Name of the dataset to create."), |
655 | | - target_database: str = Field(..., description="Target database where the dataset will be created.") |
656 | | - ) -> ResponseType: |
657 | | - return execute_db_tool(td.handle_fs_createDataset, fs_config=fs_config, entity_name=entity_name, feature_selection=feature_selection, dataset_name=dataset_name, target_database=target_database) |
658 | | - |
| 536 | + |
659 | 537 | #------------------ Main ------------------# |
660 | 538 | # Main function to start the MCP server |
661 | 539 | # Description: Initializes the MCP server and sets up signal handling for graceful shutdown. |
|
0 commit comments