@@ -19,17 +19,18 @@ def main():
1919 data_domain = 'demo_dba'
2020 connection_url = args .database_uri or os .getenv ("DATABASE_URI" )
2121
22- if not connection_url :
23- raise ValueError ("DATABASE_URI must be provided either as an argument or as an environment variable." )
22+ if args .action in ['setup' , 'cleanup' ]:
23+ if not connection_url :
24+ raise ValueError ("DATABASE_URI must be provided either as an argument or as an environment variable." )
2425
25- parsed_url = urlparse (connection_url )
26- user = parsed_url .username
27- password = parsed_url .password
28- host = parsed_url .hostname
29- port = parsed_url .port or 1025
30- database = parsed_url .path .lstrip ('/' ) or user
26+ parsed_url = urlparse (connection_url )
27+ user = parsed_url .username
28+ password = parsed_url .password
29+ host = parsed_url .hostname
30+ port = parsed_url .port or 1025
31+ database = parsed_url .path .lstrip ('/' ) or user
3132
32- eng = create_context (host = host , username = user , password = password )
33+ eng = create_context (host = host , username = user , password = password )
3334
3435 if args .action == 'setup' :
3536 # Set up the feature store
@@ -64,18 +65,123 @@ def main():
6465 "transport" : "streamable_http"
6566 }
6667 })
67- async def _test (): # small async helper
68+ async def runner ():
6869 async with mcp_client .session ("mcp_server" ) as mcp_session :
6970 tools = await load_mcp_tools (mcp_session )
7071 fs_tools = [t for t in tools if t .name .startswith ('fs_' )]
7172 print ("Available fs_ tools:" , [t .name for t in fs_tools ])
72- fs_set_tool = next ((t for t in fs_tools if t .name == 'fs_setFeatureStoreConfig' ), None )
73- if not fs_set_tool :
74- raise RuntimeError ('fs_setFeatureStoreConfig tool not found' )
75- response = await fs_set_tool .arun ({"data_domain" : data_domain , "db_name" : database_name })
76- print ("fs_setFeatureStoreConfig response:" , response )
77- import asyncio
78- asyncio .run (_test ())
73+
74+ # Map tool names for quick access
75+ tool_by_name = {t .name : t for t in fs_tools }
76+
77+ import json as _json
78+
79+ async def _call (name : str , payload : dict | None = None ):
80+ if name not in tool_by_name :
81+ raise RuntimeError (f"Tool { name } not found" )
82+ tool = tool_by_name [name ]
83+ tool_input = payload or {}
84+ # StructuredTool expects a single positional/named argument: tool_input
85+ resp = await tool .arun (tool_input = tool_input )
86+ # Try to parse JSON text if needed
87+ if isinstance (resp , str ):
88+ try :
89+ return _json .loads (resp )
90+ except Exception :
91+ return resp
92+ return resp
93+
94+ # 1) fs_isFeatureStorePresent
95+ print ("\n [1/8] fs_isFeatureStorePresent…" )
96+ r1 = await _call ('fs_isFeatureStorePresent' , {"db_name" : database_name })
97+ print ("fs_isFeatureStorePresent →" , r1 )
98+
99+ # 2) fs_setFeatureStoreConfig
100+ print ("\n [2/8] fs_setFeatureStoreConfig…" )
101+ r_set = await _call ('fs_setFeatureStoreConfig' , {"db_name" : database_name , "data_domain" : data_domain })
102+ print ("fs_setFeatureStoreConfig →" , r_set )
103+
104+ # 3) fs_getDataDomains
105+ print ("\n [3/8] fs_getDataDomains…" )
106+ r2 = await _call ('fs_getDataDomains' )
107+ print ("fs_getDataDomains →" , r2 )
108+
109+ # 4) fs_getAvailableDatasets
110+ print ("\n [4/8] fs_getAvailableDatasets…" )
111+ r3 = await _call ('fs_getAvailableDatasets' )
112+ print ("fs_getAvailableDatasets →" , r3 )
113+
114+ # 5) fs_getAvailableEntities
115+ print ("\n [5/8] fs_getAvailableEntities…" )
116+ r4 = await _call ('fs_getAvailableEntities' )
117+ print ("fs_getAvailableEntities →" , r4 )
118+
119+ # 6) fs_setFeatureStoreConfig (entity)
120+ print ("\n [6/8] fs_setFeatureStoreConfig (entity)…" )
121+ def _extract_entity_name (payload ):
122+ res = payload .get ("results" ) if isinstance (payload , dict ) else payload
123+ if isinstance (res , list ) and res and isinstance (res [0 ], dict ):
124+ for key in ("ENTITY_NAME" , "entity_name" , "entity" , "name" ):
125+ if key in res [0 ] and res [0 ][key ]:
126+ return res [0 ][key ]
127+ if isinstance (res , str ):
128+ lines = [ln .strip () for ln in res .splitlines () if ln .strip ()]
129+ if lines :
130+ parts = lines [- 1 ].split ()
131+ if parts :
132+ return parts [- 1 ]
133+ return None
134+ entity_name = _extract_entity_name (r4 ) or "tablename"
135+ r_set_entity = await _call ('fs_setFeatureStoreConfig' , {"entity" : entity_name })
136+ print ("fs_setFeatureStoreConfig (entity) →" , r_set_entity )
137+
138+ # 7) fs_getFeatures
139+ print ("\n [7/8] fs_getFeatures…" )
140+ r5 = await _call ('fs_getFeatures' )
141+ print ("fs_getFeatures →" , r5 )
142+
143+ # Extract feature names from r5
144+ def _extract_feature_names (payload ):
145+ # Accept either {"results": [...]} or a raw list
146+ items = payload .get ("results" ) if isinstance (payload , dict ) else payload
147+ if not isinstance (items , list ):
148+ return []
149+ names = []
150+ for row in items :
151+ if not isinstance (row , dict ):
152+ continue
153+ for key in ("feature_name" , "FEATURE_NAME" , "name" , "FEATURE" , "feature" ):
154+ if key in row and row [key ] is not None :
155+ names .append (row [key ])
156+ break
157+ return names
158+
159+ feature_selection = _extract_feature_names (r5 )
160+ print (f"Extracted { len (feature_selection )} feature names for dataset creation" )
161+
162+ # 8) fs_createDataset
163+ print ("\n [8/8] fs_createDataset…" )
164+ create_payload = {
165+ "entity_name" : entity_name ,
166+ "feature_selection" : feature_selection ,
167+ "dataset_name" : "test_efs_dataset" ,
168+ "target_database" : database_name ,
169+ }
170+ print ("fs_createDataset payload:" , create_payload )
171+ r6 = await _call ('fs_createDataset' , create_payload )
172+
173+ # If tool returned an error payload (not exception), also retry with CSV features
174+ if isinstance (r6 , dict ) and isinstance (r6 .get ("results" ), dict ):
175+ err = r6 ["results" ].get ("error" )
176+ if isinstance (err , str ) and ("NoneType" in err or "string" in err ):
177+ create_payload_retry = dict (create_payload )
178+ create_payload_retry ["feature_selection" ] = "," .join (feature_selection )
179+ print ("Retrying fs_createDataset with CSV features (error payload):" , create_payload_retry )
180+ r6 = await _call ('fs_createDataset' , create_payload_retry )
181+
182+ print ("fs_createDataset →" , r6 )
183+
184+ asyncio .run (runner ())
79185
80186 elif args .action == 'cleanup' :
81187 list_of_tables = db_list_tables ()
0 commit comments