Skip to content

Commit 9dcfacc

Browse files
committed
updated EFS test
1 parent 91a9e0f commit 9dcfacc

File tree

1 file changed

+81
-51
lines changed

1 file changed

+81
-51
lines changed

test/EFS/efs_mcp_test.py

Lines changed: 81 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,85 @@
44
from urllib.parse import urlparse
55
import argparse
66
import os
7+
import asyncio
8+
from langchain_mcp_adapters.client import MultiServerMCPClient
9+
from langchain_mcp_adapters.tools import load_mcp_tools
710

8-
parser = argparse.ArgumentParser(description="Teradata MCP Server")
9-
parser.add_argument('--database_uri', type=str, required=False, help='Database URI to connect to: teradata://username:password@host:1025/schemaname')
10-
parser.add_argument('--action', type=str, choices=['setup', 'cleanup'], required=True, help='Action to perform: setup or cleanup')
11-
# Extract known arguments and load them into the environment if provided
12-
args, unknown = parser.parse_known_args()
13-
14-
database_name = 'demo_user'
15-
connection_url = args.database_uri or os.getenv("DATABASE_URI")
16-
17-
if not connection_url:
18-
raise ValueError("DATABASE_URI must be provided either as an argument or as an environment variable.")
19-
20-
parsed_url = urlparse(connection_url)
21-
user = parsed_url.username
22-
password = parsed_url.password
23-
host = parsed_url.hostname
24-
port = parsed_url.port or 1025
25-
database = parsed_url.path.lstrip('/') or user
26-
27-
eng = create_context(host = host, username=user, password = password)
28-
29-
if args.action=='setup':
30-
# Set up the feature store
31-
tdfs4ds.setup(database=database_name)
32-
tdfs4ds.connect(database=database_name)
33-
34-
# Define the feature store domain
35-
tdfs4ds.DATA_DOMAIN='demo_dba'
36-
tdfs4ds.VARCHAR_SIZE=50
37-
38-
# Create features (table space and skew)
39-
df=DataFrame.from_query("SELECT databasename||'.'||tablename tablename, SUM(currentperm) currentperm, CAST((100-(AVG(currentperm)/MAX(currentperm)*100)) AS DECIMAL(5,2)) AS skew_pct FROM dbc.tablesizev GROUP BY 1;")
40-
df = crystallize_view(df, view_name = 'efs_demo_dba_space', schema_name = database_name,output_view=True)
41-
42-
# upload the features in the physical feature store
43-
tdfs4ds.upload_features(
44-
df,
45-
entity_id = ['tablename'],
46-
feature_names = df.columns[1::],
47-
metadata = {'project': 'dba'}
48-
)
49-
50-
# Display our features
51-
tdfs4ds.feature_catalog()
52-
53-
elif args.action=='cleanup':
54-
list_of_tables = db_list_tables()
55-
[execute_sql(f"DROP VIEW {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_V')]
56-
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
57-
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
58-
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
11+
def main():
12+
parser = argparse.ArgumentParser(description="Teradata MCP Server")
13+
parser.add_argument('--database_uri', type=str, required=False, help='Database URI to connect to: teradata://username:password@host:1025/schemaname')
14+
parser.add_argument('--action', type=str, choices=['setup', 'cleanup', 'test'], required=True, help='Action to perform: setup, test or cleanup')
15+
# Extract known arguments and load them into the environment if provided
16+
args, unknown = parser.parse_known_args()
17+
18+
database_name = 'demo_user'
19+
data_domain = 'demo_dba'
20+
connection_url = args.database_uri or os.getenv("DATABASE_URI")
21+
22+
if not connection_url:
23+
raise ValueError("DATABASE_URI must be provided either as an argument or as an environment variable.")
24+
25+
parsed_url = urlparse(connection_url)
26+
user = parsed_url.username
27+
password = parsed_url.password
28+
host = parsed_url.hostname
29+
port = parsed_url.port or 1025
30+
database = parsed_url.path.lstrip('/') or user
31+
32+
eng = create_context(host = host, username=user, password = password)
33+
34+
if args.action=='setup':
35+
# Set up the feature store
36+
tdfs4ds.setup(database=database_name)
37+
tdfs4ds.connect(database=database_name)
38+
39+
# Define the feature store domain
40+
tdfs4ds.DATA_DOMAIN=data_domain
41+
tdfs4ds.VARCHAR_SIZE=50
42+
43+
# Create features (table space and skew)
44+
df=DataFrame.from_query("SELECT databasename||'.'||tablename tablename, SUM(currentperm) currentperm, CAST((100-(AVG(currentperm)/MAX(currentperm)*100)) AS DECIMAL(5,2)) AS skew_pct FROM dbc.tablesizev GROUP BY 1;")
45+
df = crystallize_view(df, view_name = 'efs_demo_dba_space', schema_name = database_name,output_view=True)
46+
47+
# upload the features in the physical feature store
48+
tdfs4ds.upload_features(
49+
df,
50+
entity_id = ['tablename'],
51+
feature_names = df.columns[1::],
52+
metadata = {'project': 'dba'}
53+
)
54+
55+
# Display our features
56+
tdfs4ds.feature_catalog()
57+
58+
elif args.action == 'test':
59+
# test branch
60+
tdfs4ds.feature_store.schema = database_name
61+
mcp_client = MultiServerMCPClient({
62+
"mcp_server": {
63+
"url": "http://127.0.0.1:8001/mcp",
64+
"transport": "streamable_http"
65+
}
66+
})
67+
async def _test(): # small async helper
68+
async with mcp_client.session("mcp_server") as mcp_session:
69+
tools = await load_mcp_tools(mcp_session)
70+
fs_tools = [t for t in tools if t.name.startswith('fs_')]
71+
print("Available fs_ tools:", [t.name for t in fs_tools])
72+
fs_set_tool = next((t for t in fs_tools if t.name == 'fs_setFeatureStoreConfig'), None)
73+
if not fs_set_tool:
74+
raise RuntimeError('fs_setFeatureStoreConfig tool not found')
75+
response = await fs_set_tool.arun({"data_domain": data_domain, "db_name": database_name})
76+
print("fs_setFeatureStoreConfig response:", response)
77+
import asyncio
78+
asyncio.run(_test())
79+
80+
elif args.action=='cleanup':
81+
list_of_tables = db_list_tables()
82+
[execute_sql(f"DROP VIEW {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_V')]
83+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
84+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
85+
[execute_sql(f"DROP TABLE {database_name}.{t}") for t in list_of_tables.TableName if t.startswith('FS_T')]
86+
87+
if __name__ == '__main__':
88+
main()

0 commit comments

Comments
 (0)