Skip to content

Commit 47a5b16

Browse files
committed
updated dynamic "handle_" function wrapper to support most EFS tools and moved tool logic from server to to the tool/fs module
1 parent fb3fb64 commit 47a5b16

File tree

5 files changed

+155
-82
lines changed

5 files changed

+155
-82
lines changed

src/teradata_mcp_server/server.py

Lines changed: 50 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@
5656
else:
5757
config = all_profiles.get(profile_name)
5858

59+
# Check if the EFS or EVS tools are enabled in the profiles
60+
if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])):
61+
_enableEFS = True
62+
else:
63+
_enableEFS = False
64+
65+
if any(re.match(pattern, 'evs_*') for pattern in config.get('tool',[])):
66+
_enableEVS = True
67+
else:
68+
_enableEVS = False
69+
70+
_requireTdmlContext = _enableEFS or _enableEVS
71+
5972
# Set up logging
6073
os.makedirs("logs", exist_ok=True)
6174
logging.basicConfig(
@@ -73,13 +86,16 @@
7386
#global shutdown flag
7487
shutdown_in_progress = False
7588

76-
# Initiate connection to Teradata
77-
_tdconn = td.TDConn()
89+
# Connect to the Teradata server
90+
# Initiate base connection
91+
_tdconn = td.TDConn(tdml_context=_requireTdmlContext)
92+
93+
# If the feature store is enabled, set it up
94+
if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])):
95+
fs_config = td.FeatureStoreConfig()
7896

79-
#afm-defect:
80-
_enableEVS = False
81-
# Only attempt to connect to EVS is the system has an EVS installed/configured
82-
if (len(os.getenv("VS_NAME", "").strip()) > 0):
97+
# If the enterprise vector store is enabled, set it up
98+
if _enableEVS and (len(os.getenv("VS_NAME", "").strip()) > 0):
8399
try:
84100
_evs = td.get_evs()
85101
_enableEVS = True
@@ -126,7 +142,7 @@ def execute_db_tool(tool, *args, **kwargs):
126142
# (Re)initialize if needed
127143
if not getattr(_tdconn, "engine", None):
128144
logger.info("Reinitializing TDConn")
129-
_tdconn = td.TDConn()
145+
_tdconn = td.TDConn(tdml_context=_requireTdmlContext)
130146

131147
# Check is the first argument of the tool is a SQLAlchemy Connection
132148
sig = inspect.signature(tool)
@@ -180,17 +196,22 @@ def execute_vs_tool(tool, *args, **kwargs) -> ResponseType:
180196
def make_tool_wrapper(func):
181197
"""
182198
Given a handle_* function, return an async wrapper with:
183-
- the same signature minus the first 'connection' param
184-
- a call into execute_db_tool
199+
- the same signature minus any 'conn' or 'fs_config' params
200+
- injection of fs_config when declared, while conn injection is handled by execute_db_tool
185201
"""
186-
187202
sig = inspect.signature(func)
188-
# Drop first param (connection) and skip 'tool_name' if present
189-
first_param = next(iter(sig.parameters))
203+
204+
# Determine which parameters to inject and remove from the exposed signature
205+
inject_kwargs = {}
206+
removable = {"conn", "tool_name"}
207+
if "fs_config" in sig.parameters:
208+
inject_kwargs["fs_config"] = fs_config
209+
removable.add("fs_config")
210+
211+
# Build the new signature without injected params
190212
params = [
191213
p for name, p in sig.parameters.items()
192-
if name != first_param
193-
and name != "tool_name" # skip tool_name if present as this allows to override the tool name for query tools declared in # the *_objects.yml files
214+
if name not in removable
194215
and p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
195216
]
196217
new_sig = sig.replace(parameters=params)
@@ -199,8 +220,8 @@ def make_tool_wrapper(func):
199220
async def wrapper(*args, **kwargs):
200221
ba = new_sig.bind_partial(*args, **kwargs)
201222
ba.apply_defaults()
202-
203-
return execute_db_tool(func, **ba.arguments)
223+
# execute_db_tool handles injecting `conn`; we add fs_config if needed
224+
return execute_db_tool(func, **inject_kwargs, **ba.arguments)
204225

205226
wrapper.__signature__ = new_sig
206227
return wrapper
@@ -209,7 +230,7 @@ async def wrapper(*args, **kwargs):
209230
def register_td_tools(config, td, mcp):
210231
patterns = config.get('tool', [])
211232
for name, func in inspect.getmembers(td, inspect.isfunction):
212-
if not name.startswith("handle_") or name.startswith("handle_fs_"):
233+
if not name.startswith("handle_"): #or name.startswith("handle_fs_"):
213234
continue
214235

215236
tool_name = name[len("handle_"):]
@@ -456,23 +477,21 @@ def get_glossary_term(term_name: str) -> dict:
456477

457478

458479

480+
@mcp.tool(description="Reconnect to the Teradata database if the connection is lost.")
481+
async def reconnect_to_database() -> ResponseType:
482+
"""Reconnect to Teradata database if connection is lost."""
483+
global _tdconn
484+
try:
485+
_tdconn = td.TDConn(tdml_context=_requireTdmlContext) # Recreate TDConn with tdml_context enabled
486+
return format_text_response("Reconnected to Teradata database successfully.")
487+
except Exception as e:
488+
logger.error(f"Error reconnecting to database: {e}")
489+
return format_error_response(str(e))
490+
459491
#--------------- Feature Store Tools ---------------#
460492
# Feature tools leveraging the tdfs4ds package.
461493
# Run only if the EFS tools are defined in the config
462-
if any(re.match(pattern, 'fs_*') for pattern in config.get('tool',[])):
463-
fs_config = td.FeatureStoreConfig()
464-
465-
@mcp.tool(description="Reconnect to the Teradata database if the connection is lost.")
466-
async def reconnect_to_database() -> ResponseType:
467-
"""Reconnect to Teradata database if connection is lost."""
468-
global _tdconn
469-
try:
470-
_tdconn = td.TDConn()
471-
td.teradataml_connection()
472-
return format_text_response("Reconnected to Teradata database successfully.")
473-
except Exception as e:
474-
logger.error(f"Error reconnecting to database: {e}")
475-
return format_error_response(str(e))
494+
if _enableEFS:
476495

477496
@mcp.tool(description="Set or update the feature store configuration (database and data domain).")
478497
async def fs_setFeatureStoreConfig(
@@ -519,53 +538,6 @@ async def fs_setFeatureStoreConfig(
519538
async def fs_getFeatureStoreConfig() -> ResponseType:
520539
return format_text_response(f"Current feature store config: {fs_config.dict(exclude_none=True)}")
521540

522-
@mcp.tool(description=("Check if a feature store is present in the specified database." ))
523-
async def fs_isFeatureStorePresent(
524-
db_name: str = Field(..., description="Name of the database to check for a feature store.")
525-
) -> ResponseType:
526-
return execute_db_tool(td.handle_fs_isFeatureStorePresent, db_name=db_name)
527-
528-
@mcp.tool(description=("Returns a summary of the feature store content. Use this to understand what data is available in the feature store"))
529-
async def fs_featureStoreContent(
530-
) -> ResponseType:
531-
return execute_db_tool(td.handle_fs_featureStoreContent, fs_config=fs_config)
532-
533-
@mcp.tool(description=( "List the available data domains. Requires a configured `db_name` in the feature store config. Use this to explore which entities can be used when building a dataset."))
534-
async def fs_getDataDomains(
535-
entity: str = Field(..., description="The .")
536-
) -> ResponseType:
537-
return execute_db_tool(td.handle_fs_getDataDomains, fs_config=fs_config)
538-
539-
@mcp.tool(description=("List the list of features. Requires a configured `db_name` and `data_domain` in the feature store config. Use this to explore the features available ."))
540-
async def fs_getFeatures(
541-
) -> ResponseType:
542-
return execute_db_tool(td.handle_fs_getFeatures, fs_config=fs_config)
543-
544-
@mcp.tool(description=("List the list of available datasets.Requires a configured `db_name` in the feature store config.Use this to explore the datasets that are available ."))
545-
async def fs_getAvailableDatasets(
546-
) -> ResponseType:
547-
return execute_db_tool(td.handle_fs_getAvailableDatasets, fs_config=fs_config)
548-
549-
@mcp.tool(description=("Return the schema of the feature store.Requires a feature store in the configured database (`db_name`)."))
550-
async def fs_getFeatureDataModel(
551-
) -> ResponseType:
552-
return execute_db_tool(td.handle_fs_getFeatureDataModel, fs_config=fs_config)
553-
554-
555-
@mcp.tool(description=("List the available entities for a given data domain. Requires a configured `db_name` and `data_domain` and `entity` in the feature store config. Use this to explore which entities can be used when building a dataset."))
556-
async def fs_getAvailableEntities(
557-
) -> ResponseType:
558-
return execute_db_tool(td.handle_fs_getAvailableEntities, fs_config=fs_config)
559-
560-
@mcp.tool( description=("Create a dataset using selected features and an entity from the feature store. The dataset is created in the specified target database under the given name. Requires a configured feature store and data domain. Registers the dataset in the catalog automatically. Use this when you want to build and register a new dataset for analysis or modeling." ) )
561-
async def fs_createDataset(
562-
entity_name: str = Field(..., description="Entity for which the dataset will be created. Available entities are reported in the feature catalog."),
563-
feature_selection: list[str] = Field(..., description="List of features to include in the dataset. Available features are reported in the feature catalog."),
564-
dataset_name: str = Field(..., description="Name of the dataset to create."),
565-
target_database: str = Field(..., description="Target database where the dataset will be created.")
566-
) -> ResponseType:
567-
return execute_db_tool(td.handle_fs_createDataset, fs_config=fs_config, entity_name=entity_name, feature_selection=feature_selection, dataset_name=dataset_name, target_database=target_database)
568-
569541
#------------------ Main ------------------#
570542
# Main function to start the MCP server
571543
# Description: Initializes the MCP server and sets up signal handling for graceful shutdown.

src/teradata_mcp_server/tools/fs/fs_tools.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
# db_name - the database name to check for existenceAdd commentMore actions
2222
# # Returns: True or False
2323
def handle_fs_isFeatureStorePresent(conn: TeradataConnection, db_name: str, *args, **kwargs):
24+
""" Check if a feature store is present in the specified database.
25+
26+
Args:
27+
db_name (str): The name of the database to check for the feature store.
28+
"""
2429

2530
logger.info(f"Tool: handle_fs_isFeatureStorePresent: Args: db_name: {db_name}")
2631

@@ -44,6 +49,10 @@ def handle_fs_isFeatureStorePresent(conn: TeradataConnection, db_name: str, *arg
4449
# conn (TeradataConnection) - Teradata connection object for executing SQL queries
4550
# # Returns: True or False
4651
def handle_fs_getDataDomains(conn: TeradataConnection, fs_config, *args, **kwargs):
52+
"""
53+
List the available data domains. Requires a configured `db_name` in the feature store config. Use this to explore which entities can be used when building a dataset.
54+
"""
55+
4756
db_name = fs_config.db_name
4857
logger.info(f"Tool: handle_fs_getDataDomains: Args: db_name: {db_name}")
4958

@@ -86,6 +95,10 @@ def handle_fs_getDataDomains(conn: TeradataConnection, fs_config, *args, **kwarg
8695
# conn (TeradataConnection) - Teradata connection object for executing SQL queries
8796
# # Returns: True or False
8897
def handle_fs_featureStoreContent(conn: TeradataConnection, fs_config, *args, **kwargs):
98+
"""
99+
Returns a summary of the feature store content. Use this to understand what data is available in the feature store.
100+
"""
101+
89102
db_name = fs_config.db_name
90103
logger.info(f"Tool: handle_fs_featureStoreContent: Args: db_name: {db_name}")
91104
metadata = {
@@ -125,6 +138,10 @@ def handle_fs_featureStoreContent(conn: TeradataConnection, fs_config, *args, **
125138
# db_name - the database name to check for existence
126139
# # Returns: the feature store schema, mainly the catalogs
127140
def handle_fs_getFeatureDataModel(conn: TeradataConnection, fs_config, *args, **kwargs):
141+
"""
142+
Returns the feature store data model, including the feature catalog, process catalog, and dataset catalog.
143+
"""
144+
128145
db_name = fs_config.db_name
129146
logger.info(f"Tool: handle_fs_getFeatureDataModel: Args: db_name: {db_name}")
130147

@@ -166,6 +183,9 @@ def handle_fs_getFeatureDataModel(conn: TeradataConnection, fs_config, *args, **
166183
# db_name - the database name to check for existence
167184
# # Returns: True or False
168185
def handle_fs_getAvailableEntities(conn: TeradataConnection, fs_config, *args, **kwargs):
186+
"""
187+
List the available entities for a given data domain. Requires a configured `db_name` and `data_domain` and `entity` in the feature store config. Use this to explore which entities can be used when building a dataset.
188+
"""
169189
db_name = fs_config.db_name
170190
logger.info(f"Tool: handle_fs_getAvailableEntities: Args: db_name: {db_name}")
171191

@@ -212,6 +232,9 @@ def handle_fs_getAvailableEntities(conn: TeradataConnection, fs_config, *args, *
212232
# db_name - the database name to check for existence
213233
# # Returns: True or False
214234
def handle_fs_getAvailableDatasets(conn: TeradataConnection, fs_config, *args, **kwargs):
235+
"""
236+
List the list of available datasets.Requires a configured `db_name` in the feature store config.Use this to explore the datasets that are available .
237+
"""
215238
db_name = fs_config.db_name
216239
logger.info(f"Tool: handle_fs_getAvailableDatasets: Args: db_name: {db_name}")
217240

@@ -245,6 +268,10 @@ def handle_fs_getAvailableDatasets(conn: TeradataConnection, fs_config, *args, *
245268
# db_name - the database name to check for existence
246269
# # Returns: True or False
247270
def handle_fs_getFeatures(conn: TeradataConnection, fs_config, *args, **kwargs):
271+
"""
272+
List the list of features. Requires a configured `db_name` and `data_domain` in the feature store config. Use this to explore the features available .
273+
"""
274+
248275
db_name = fs_config.db_name
249276
logger.info(f"Tool: handle_fs_getFeatures: Args: db_name: {db_name}")
250277

@@ -305,6 +332,15 @@ def handle_fs_getFeatures(conn: TeradataConnection, fs_config, *args, **kwargs):
305332
# db_name - the database name to check for existence
306333
# # Returns: True or False
307334
def handle_fs_createDataset(conn: TeradataConnection, fs_config, entity_name: str, feature_selection: str, dataset_name: str, target_database: str, *args, **kwargs):
335+
"""
336+
Create a dataset using selected features and an entity from the feature store. The dataset is created in the specified target database under the given name. Requires a configured feature store and data domain. Registers the dataset in the catalog automatically. Use this when you want to build and register a new dataset for analysis or modeling.
337+
Args:
338+
entity_name (str): Entity for which the dataset will be created. Available entities are reported in the feature catalog.
339+
feature_selection (list[str]): List of features to include in the dataset. Available features are reported in the feature catalog.
340+
dataset_name (str): The name of the dataset to create.
341+
target_database (str): The database where the dataset will be created.
342+
"""
343+
308344
db_name = fs_config.db_name
309345
logger.info(f"Tool: handle_fs_createDataset: Args: db_name: {db_name}")
310346

src/teradata_mcp_server/tools/td_connect.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121
class TDConn:
2222
engine: Optional[Engine] = None
2323
connection_url: Optional[str] = None
24+
tdml_context: Optional[bool] = False
2425

2526
# Constructor
2627
# It will read the connection URL from the environment variable DATABASE_URI
2728
# It will parse the connection URL and create a SQLAlchemy engine connected to the database
28-
def __init__(self, connection_url: Optional[str] = None):
29+
def __init__(self, connection_url: Optional[str] = None, tdml_context: Optional[bool] = False):
2930
if connection_url is None and os.getenv("DATABASE_URI") is None:
3031
logger.warning("DATABASE_URI is not specified, database connection will not be established.")
3132
self.engine = None
@@ -64,9 +65,10 @@ def __init__(self, connection_url: Optional[str] = None):
6465
logger.error(f"Error creating database engine: {e}")
6566
self.engine = None
6667

67-
# Create the teradataml context
68-
if "<EVS or EFS enabled>":
69-
import teradataml as tdml # import of the teradataml package
68+
# Create the teradataml context
69+
self.tdml_context = tdml_context
70+
if self.tdml_context:
71+
import teradataml as tdml # import of the teradataml package
7072
tdml.create_context(tdsqlengine=self.engine)
7173

7274
# Destructor

test/EFS/efs_mcp_test.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import tdfs4ds
2+
from tdfs4ds.utils.lineage import crystallize_view
3+
from teradataml import create_context, DataFrame, execute_sql, db_list_tables
4+
from urllib.parse import urlparse
5+
import argparse
6+
import os
7+
8+
parser = argparse.ArgumentParser(description="Teradata MCP Server")
9+
parser.add_argument('--database_uri', type=str, required=False, help='Database URI to connect to: teradata://username:password@host:1025/schemaname')
10+
parser.add_argument('--action', type=str, choices=['setup', 'cleanup'], required=True, help='Action to perform: setup or cleanup')
11+
# Extract known arguments and load them into the environment if provided
12+
args, unknown = parser.parse_known_args()
13+
14+
database_name = 'demo_user'
15+
connection_url = args.database_uri or os.getenv("DATABASE_URI")
16+
17+
if not connection_url:
18+
raise ValueError("DATABASE_URI must be provided either as an argument or as an environment variable.")
19+
20+
parsed_url = urlparse(connection_url)
21+
user = parsed_url.username
22+
password = parsed_url.password
23+
host = parsed_url.hostname
24+
port = parsed_url.port or 1025
25+
database = parsed_url.path.lstrip('/') or user
26+
27+
eng = create_context(host = host, username=user, password = password)
28+
29+
if args.action=='setup':
30+
# Set up the feature store
31+
tdfs4ds.setup(database=database_name)
32+
tdfs4ds.connect(database=database_name)
33+
34+
# Define the feature store domain
35+
tdfs4ds.DATA_DOMAIN='demo_dba'
36+
tdfs4ds.VARCHAR_SIZE=50
37+
38+
# Create features (table space and skew)
39+
df=DataFrame.from_query("SELECT databasename||'.'||tablename tablename, SUM(currentperm) currentperm, CAST((100-(AVG(currentperm)/MAX(currentperm)*100)) AS DECIMAL(5,2)) AS skew_pct FROM dbc.tablesizev GROUP BY 1;")
40+
df = crystallize_view(df, view_name = 'efs_demo_dba_space', schema_name = database_name,output_view=True)
41+
42+
# upload the features in the physical feature store
43+
tdfs4ds.upload_features(
44+
df,
45+
entity_id = ['tablename'],
46+
feature_names = df.columns[1::],
47+
metadata = {'project': 'dba'}
48+
)
49+
50+
# Display our features
51+
tdfs4ds.feature_catalog()
52+
53+
elif args.action=='cleanup':
54+
list_of_tables = db_list_tables()
55+
[execute_sql(f"DROP VIEW {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_V')]
56+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
57+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
58+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]

test/EFS/requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
tdfs4ds
2+
teradataml
3+
pandas
4+
json
5+
sqlalchemy

0 commit comments

Comments
 (0)