Skip to content

Commit 1858909

Browse files
authored
Merge pull request #121 from Teradata/efs-cleanup-testing
Efs cleanup testing
2 parents aa6a72f + e9aaa45 commit 1858909

File tree

7 files changed

+469
-162
lines changed

7 files changed

+469
-162
lines changed

src/teradata_mcp_server/server.py

Lines changed: 60 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,20 @@
5858
else:
5959
config = all_profiles.get(profile_name)
6060

61+
# Check if the EFS or EVS tools are enabled in the profiles
62+
if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])):
63+
_enableEFS = True
64+
else:
65+
_enableEFS = False
66+
67+
if any(re.match(pattern, 'evs_*') for pattern in config.get('tool',[])):
68+
_enableEVS = True
69+
else:
70+
_enableEVS = False
71+
6172
# Initialize module loader with profile configuration
6273
module_loader = td.initialize_module_loader(config)
6374

64-
# Now initialize the TD connection after module loader is ready
65-
_tdconn = td.TDConn()
66-
6775
# Set up logging
6876
os.makedirs("logs", exist_ok=True)
6977
logging.basicConfig(
@@ -78,12 +86,21 @@
7886
# Connect to MCP server
7987
mcp = FastMCP("teradata-mcp-server")
8088

89+
# Now initialize the TD connection after module loader is ready
90+
_tdconn = td.TDConn()
91+
92+
if _enableEFS:
93+
fs_config = td.FeatureStoreConfig()
94+
import teradataml as tdml # import of the teradataml package
95+
try:
96+
tdml.create_context(tdsqlengine=_tdconn.engine)
97+
except Exception as e:
98+
logger.warning(f"Error creating teradataml context: {e}")
99+
100+
81101
#global shutdown flag
82102
shutdown_in_progress = False
83103

84-
# Initiate connection to Teradata (delayed until after module loader is ready)
85-
_tdconn = None
86-
87104
#afm-defect:
88105
_enableEVS = False
89106
# Only attempt to connect to EVS is the system has an EVS installed/configured
@@ -135,6 +152,14 @@ def execute_db_tool(tool, *args, **kwargs):
135152
if not getattr(_tdconn, "engine", None):
136153
logger.info("Reinitializing TDConn")
137154
_tdconn = td.TDConn()
155+
if _enableEFS:
156+
fs_config = td.FeatureStoreConfig()
157+
import teradataml as tdml # import of the teradataml package
158+
try:
159+
tdml.create_context(tdsqlengine=_tdconn.engine)
160+
except Exception as e:
161+
logger.warning(f"Error creating teradataml context: {e}")
162+
138163

139164
# Check is the first argument of the tool is a SQLAlchemy Connection
140165
sig = inspect.signature(tool)
@@ -188,17 +213,22 @@ def execute_vs_tool(tool, *args, **kwargs) -> ResponseType:
188213
def make_tool_wrapper(func):
189214
"""
190215
Given a handle_* function, return an async wrapper with:
191-
- the same signature minus the first 'connection' param
192-
- a call into execute_db_tool
216+
- the same signature minus any 'conn' or 'fs_config' params
217+
- injection of fs_config when declared, while conn injection is handled by execute_db_tool
193218
"""
194-
195219
sig = inspect.signature(func)
196-
# Drop first param (connection) and skip 'tool_name' if present
197-
first_param = next(iter(sig.parameters))
220+
221+
# Determine which parameters to inject and remove from the exposed signature
222+
inject_kwargs = {}
223+
removable = {"conn", "tool_name"}
224+
if "fs_config" in sig.parameters:
225+
inject_kwargs["fs_config"] = fs_config
226+
removable.add("fs_config")
227+
228+
# Build the new signature without injected params
198229
params = [
199230
p for name, p in sig.parameters.items()
200-
if name != first_param
201-
and name != "tool_name" # skip tool_name if present as this allows to override the tool name for query tools declared in # the *_objects.yml files
231+
if name not in removable
202232
and p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
203233
]
204234
new_sig = sig.replace(parameters=params)
@@ -207,8 +237,8 @@ def make_tool_wrapper(func):
207237
async def wrapper(*args, **kwargs):
208238
ba = new_sig.bind_partial(*args, **kwargs)
209239
ba.apply_defaults()
210-
211-
return execute_db_tool(func, **ba.arguments)
240+
# execute_db_tool handles injecting `conn`; we add fs_config if needed
241+
return execute_db_tool(func, **inject_kwargs, **ba.arguments)
212242

213243
wrapper.__signature__ = new_sig
214244
return wrapper
@@ -225,7 +255,7 @@ def register_td_tools(config, module_loader, mcp):
225255
# Get all functions from the loaded modules
226256
all_functions = module_loader.get_all_functions()
227257
for name, func in all_functions.items():
228-
if not (inspect.isfunction(func) and name.startswith("handle_") and not name.startswith("handle_fs_")):
258+
if not (inspect.isfunction(func) and name.startswith("handle_")):
229259
continue
230260

231261
tool_name = name[len("handle_"):]
@@ -483,157 +513,27 @@ def get_glossary_term(term_name: str) -> dict:
483513
#--------------- Feature Store Tools ---------------#
484514
# Feature tools leveraging the tdfs4ds package.
485515
# Run only if the EFS tools are defined in the config
486-
if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])):
487-
class FeatureStoreConfig(BaseModel):
488-
"""
489-
Configuration class for the feature store. This model defines the metadata and catalog sources
490-
used to organize and access features, processes, and datasets across data domains.
491-
"""
492-
493-
data_domain: str | None = Field(
494-
default=None,
495-
description="The data domain associated with the feature store, grouping features within the same namespace."
496-
)
497-
498-
entity: str | None = Field(
499-
default=None,
500-
description="The list of entities, comma separated and in alphabetical order, upper case."
501-
)
502-
503-
db_name: str | None = Field(
504-
default=None,
505-
description="Name of the database where the feature store is hosted."
506-
)
507-
508-
feature_catalog: str | None = Field(
509-
default=None,
510-
description=(
511-
"Name of the feature catalog table. "
512-
"This table contains detailed metadata about features and entities."
513-
)
514-
)
515-
516-
process_catalog: str | None = Field(
517-
default=None,
518-
description=(
519-
"Name of the process catalog table. "
520-
"Used to retrieve information about feature generation processes, features, and associated entities."
521-
)
522-
)
523-
524-
dataset_catalog: str | None = Field(
525-
default=None,
526-
description=(
527-
"Name of the dataset catalog table. "
528-
"Used to list and manage available datasets within the feature store."
529-
)
530-
)
531-
532-
fs_config = FeatureStoreConfig()
533-
534-
@mcp.tool(description="Reconnect to the Teradata database if the connection is lost.")
535-
async def reconnect_to_database() -> ResponseType:
536-
"""Reconnect to Teradata database if connection is lost."""
537-
global _tdconn
538-
try:
539-
_tdconn = td.TDConn()
540-
td.teradataml_connection()
541-
return format_text_response("Reconnected to Teradata database successfully.")
542-
except Exception as e:
543-
logger.error(f"Error reconnecting to database: {e}")
544-
return format_error_response(str(e))
516+
if _enableEFS:
545517

546518
@mcp.tool(description="Set or update the feature store configuration (database and data domain).")
547519
async def fs_setFeatureStoreConfig(
548-
data_domain: str | None = None,
549-
db_name: str | None = None,
550-
entity: str | None = None,
551-
) -> ResponseType:
552-
if db_name:
553-
if tdfs4ds.connect(database=db_name):
554-
logger.info(f"connected to the feature store of the {db_name} database")
555-
# Reset data_domain if DB name changes
556-
if not (fs_config.db_name and fs_config.db_name.upper() == db_name.upper()):
557-
fs_config.data_domain = None
558-
559-
fs_config.db_name = db_name
560-
logger.info(f"connected to the feature store of the {db_name} database")
561-
fs_config.feature_catalog = f"{db_name}.{tdfs4ds.FEATURE_CATALOG_NAME_VIEW}"
562-
logger.info(f"feature catalog {fs_config.feature_catalog}")
563-
fs_config.process_catalog = f"{db_name}.{tdfs4ds.PROCESS_CATALOG_NAME_VIEW}"
564-
logger.info(f"process catalog {fs_config.process_catalog}")
565-
fs_config.dataset_catalog = f"{db_name}.FS_V_FS_DATASET_CATALOG" # <- fixed line
566-
logger.info(f"dataset catalog {fs_config.dataset_catalog}")
567-
568-
if fs_config.db_name is not None and data_domain is not None:
569-
sql_query_ = f"SEL count(*) AS N FROM {fs_config.feature_catalog} WHERE UPPER(data_domain) = '{data_domain.upper()}'"
570-
logger.info(f"{sql_query_}")
571-
result = tdml.execute_sql(sql_query_)
572-
logger.info(f"{result}")
573-
if result.fetchall()[0][0] > 0:
574-
fs_config.data_domain = data_domain
575-
else:
576-
fs_config.data_domain = None
577-
578-
if fs_config.db_name is not None and fs_config.data_domain is not None and entity is not None:
579-
sql_query_ = f"SEL count(*) AS N FROM {fs_config.feature_catalog} WHERE UPPER(data_domain) = '{data_domain.upper()}' AND ENTITY_NAME = '{entity.upper()}'"
580-
logger.info(f"{sql_query_}")
581-
result = tdml.execute_sql(sql_query_)
582-
logger.info(f"{result}")
583-
if result.fetchall()[0][0] > 0:
584-
fs_config.entity = entity
585-
return format_text_response(f"Feature store config updated: {fs_config.model_dump(exclude_none=True)}")
520+
data_domain: Optional[str] = None,
521+
db_name: Optional[str] = None,
522+
entity: Optional[str] = None,
523+
) -> td.FeatureStoreConfig:
524+
global _tdconn
525+
with _tdconn.engine.connect() as conn:
526+
return fs_config.fs_setFeatureStoreConfig(
527+
conn=conn,
528+
db_name=db_name,
529+
data_domain=data_domain,
530+
entity=entity,
531+
)
586532

587533
@mcp.tool(description="Display the current feature store configuration (database and data domain).")
588534
async def fs_getFeatureStoreConfig() -> ResponseType:
589535
return format_text_response(f"Current feature store config: {fs_config.model_dump(exclude_none=True)}")
590-
591-
@mcp.tool(description=("Check if a feature store is present in the specified database." ))
592-
async def fs_isFeatureStorePresent(
593-
db_name: str = Field(..., description="Name of the database to check for a feature store.")
594-
) -> ResponseType:
595-
return execute_db_tool(td.handle_fs_isFeatureStorePresent, db_name=db_name)
596-
597-
@mcp.tool(description=("Returns a summary of the feature store content. Use this to understand what data is available in the feature store"))
598-
async def fs_featureStoreContent(
599-
) -> ResponseType:
600-
return execute_db_tool(td.handle_fs_featureStoreContent, fs_config=fs_config)
601-
602-
@mcp.tool(description=( "List the available data domains. Requires a configured `db_name` in the feature store config. Use this to explore which entities can be used when building a dataset."))
603-
async def fs_getDataDomains(
604-
) -> ResponseType:
605-
return execute_db_tool(td.handle_fs_getDataDomains, fs_config=fs_config)
606-
607-
@mcp.tool(description=("List the list of features. Requires a configured `db_name` and `data_domain` in the feature store config. Use this to explore the features available ."))
608-
async def fs_getFeatures(
609-
) -> ResponseType:
610-
return execute_db_tool(td.handle_fs_getFeatures, fs_config=fs_config)
611-
612-
@mcp.tool(description=("List the list of available datasets.Requires a configured `db_name` in the feature store config.Use this to explore the datasets that are available ."))
613-
async def fs_getAvailableDatasets(
614-
) -> ResponseType:
615-
return execute_db_tool(td.handle_fs_getAvailableDatasets, fs_config=fs_config)
616-
617-
@mcp.tool(description=("Return the schema of the feature store.Requires a feature store in the configured database (`db_name`)."))
618-
async def fs_getFeatureDataModel(
619-
) -> ResponseType:
620-
return execute_db_tool(td.handle_fs_getFeatureDataModel, fs_config=fs_config)
621-
622-
623-
@mcp.tool(description=("List the available entities for a given data domain. Requires a configured `db_name` and `data_domain` and `entity` in the feature store config. Use this to explore which entities can be used when building a dataset."))
624-
async def fs_getAvailableEntities(
625-
) -> ResponseType:
626-
return execute_db_tool(td.handle_fs_getAvailableEntities, fs_config=fs_config)
627-
628-
@mcp.tool( description=("Create a dataset using selected features and an entity from the feature store. The dataset is created in the specified target database under the given name. Requires a configured feature store and data domain. Registers the dataset in the catalog automatically. Use this when you want to build and register a new dataset for analysis or modeling." ) )
629-
async def fs_createDataset(
630-
entity_name: str = Field(..., description="Entity for which the dataset will be created. Available entities are reported in the feature catalog."),
631-
feature_selection: list[str] = Field(..., description="List of features to include in the dataset. Available features are reported in the feature catalog."),
632-
dataset_name: str = Field(..., description="Name of the dataset to create."),
633-
target_database: str = Field(..., description="Target database where the dataset will be created.")
634-
) -> ResponseType:
635-
return execute_db_tool(td.handle_fs_createDataset, fs_config=fs_config, entity_name=entity_name, feature_selection=feature_selection, dataset_name=dataset_name, target_database=target_database)
636-
536+
637537
#------------------ Main ------------------#
638538
# Main function to start the MCP server
639539
# Description: Initializes the MCP server and sets up signal handling for graceful shutdown.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .fs_resources import *
22
from .fs_tools import *
3+
from .fs_utils import *

src/teradata_mcp_server/tools/fs/fs_tools.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
logger = logging.getLogger("teradata_mcp_server")
99

10+
from teradata_mcp_server.tools.utils import serialize_teradata_types, rows_to_json, create_response
1011

1112
#------------------ Do not make changes above ------------------#
1213

@@ -18,7 +19,12 @@
1819
# db_name - the database name to check for existenceAdd commentMore actions
1920
# # Returns: True or False
2021
def handle_fs_isFeatureStorePresent(conn: TeradataConnection, db_name: str, *args, **kwargs):
21-
22+
""" Check if a feature store is present in the specified database.
23+
24+
Args:
25+
db_name (str): The name of the database to check for the feature store.
26+
"""
27+
2228
logger.info(f"Tool: handle_fs_isFeatureStorePresent: Args: db_name: {db_name}")
2329

2430
data = False
@@ -41,6 +47,10 @@ def handle_fs_isFeatureStorePresent(conn: TeradataConnection, db_name: str, *arg
4147
# conn (TeradataConnection) - Teradata connection object for executing SQL queries
4248
# # Returns: True or False
4349
def handle_fs_getDataDomains(conn: TeradataConnection, fs_config, *args, **kwargs):
50+
"""
51+
List the available data domains. Requires a configured `db_name` in the feature store config. Use this to explore which entities can be used when building a dataset.
52+
"""
53+
4454
db_name = fs_config.db_name
4555
logger.info(f"Tool: handle_fs_getDataDomains: Args: db_name: {db_name}")
4656

@@ -83,6 +93,10 @@ def handle_fs_getDataDomains(conn: TeradataConnection, fs_config, *args, **kwarg
8393
# conn (TeradataConnection) - Teradata connection object for executing SQL queries
8494
# # Returns: True or False
8595
def handle_fs_featureStoreContent(conn: TeradataConnection, fs_config, *args, **kwargs):
96+
"""
97+
Returns a summary of the feature store content. Use this to understand what data is available in the feature store.
98+
"""
99+
86100
db_name = fs_config.db_name
87101
logger.info(f"Tool: handle_fs_featureStoreContent: Args: db_name: {db_name}")
88102
metadata = {
@@ -122,6 +136,10 @@ def handle_fs_featureStoreContent(conn: TeradataConnection, fs_config, *args, **
122136
# db_name - the database name to check for existence
123137
# # Returns: the feature store schema, mainly the catalogs
124138
def handle_fs_getFeatureDataModel(conn: TeradataConnection, fs_config, *args, **kwargs):
139+
"""
140+
Returns the feature store data model, including the feature catalog, process catalog, and dataset catalog.
141+
"""
142+
125143
db_name = fs_config.db_name
126144
logger.info(f"Tool: handle_fs_getFeatureDataModel: Args: db_name: {db_name}")
127145

@@ -163,6 +181,9 @@ def handle_fs_getFeatureDataModel(conn: TeradataConnection, fs_config, *args, **
163181
# db_name - the database name to check for existence
164182
# # Returns: True or False
165183
def handle_fs_getAvailableEntities(conn: TeradataConnection, fs_config, *args, **kwargs):
184+
"""
185+
List the available entities for a given data domain. Requires a configured `db_name` and `data_domain` and `entity` in the feature store config. Use this to explore which entities can be used when building a dataset.
186+
"""
166187
db_name = fs_config.db_name
167188
logger.info(f"Tool: handle_fs_getAvailableEntities: Args: db_name: {db_name}")
168189

@@ -209,6 +230,10 @@ def handle_fs_getAvailableEntities(conn: TeradataConnection, fs_config, *args, *
209230
# db_name - the database name to check for existence
210231
# # Returns: True or False
211232
def handle_fs_getAvailableDatasets(conn: TeradataConnection, fs_config, *args, **kwargs):
233+
"""
234+
List the list of available datasets.Requires a configured `db_name` in the feature store config.Use this to explore the datasets that are available .
235+
"""
236+
212237
db_name = fs_config.db_name
213238
logger.info(f"Tool: handle_fs_getAvailableDatasets: Args: db_name: {db_name}")
214239

@@ -242,6 +267,10 @@ def handle_fs_getAvailableDatasets(conn: TeradataConnection, fs_config, *args, *
242267
# db_name - the database name to check for existence
243268
# # Returns: True or False
244269
def handle_fs_getFeatures(conn: TeradataConnection, fs_config, *args, **kwargs):
270+
"""
271+
List the list of features. Requires a configured `db_name` and `data_domain` in the feature store config. Use this to explore the features available .
272+
"""
273+
245274
db_name = fs_config.db_name
246275
logger.info(f"Tool: handle_fs_getFeatures: Args: db_name: {db_name}")
247276

@@ -301,7 +330,16 @@ def handle_fs_getFeatures(conn: TeradataConnection, fs_config, *args, **kwargs):
301330
# conn (TeradataConnection) - Teradata connection object for executing SQL queries
302331
# db_name - the database name to check for existence
303332
# # Returns: True or False
304-
def handle_fs_createDataset(conn: TeradataConnection, fs_config, entity_name: str, feature_selection: str, dataset_name: str, target_database: str, *args, **kwargs):
333+
def handle_fs_createDataset(conn: TeradataConnection, fs_config, entity_name: str, feature_selection: list[str], dataset_name: str, target_database: str, *args, **kwargs):
334+
"""
335+
Create a dataset using selected features and an entity from the feature store. The dataset is created in the specified target database under the given name. Requires a configured feature store and data domain. Registers the dataset in the catalog automatically. Use this when you want to build and register a new dataset for analysis or modeling.
336+
Args:
337+
entity_name (str): Entity for which the dataset will be created. Available entities are reported in the feature catalog.
338+
feature_selection (list[str]): List of features to include in the dataset. Available features are reported in the feature catalog.
339+
dataset_name (str): The name of the dataset to create.
340+
target_database (str): The database where the dataset will be created.
341+
"""
342+
305343
db_name = fs_config.db_name
306344
logger.info(f"Tool: handle_fs_createDataset: Args: db_name: {db_name}")
307345

0 commit comments

Comments
 (0)