44from typing import Optional
55from fastapi import FastAPI
66import requests
7+ from sqlalchemy import Engine , create_engine
78from sqlmodel import Session , select
8- from apps .datasource .models .datasource import CoreDatasource
9+ from apps .datasource .models .datasource import CoreDatasource , DatasourceConf
10+
911from apps .system .models .system_model import AssistantModel
1012from apps .system .schemas .auth import CacheName , CacheNamespace
1113from apps .system .schemas .system_schema import AssistantOutDsSchema , UserInfoDTO
@@ -92,7 +94,7 @@ def __init__(self, assistant: AssistantModel, certificate: Optional[str] = None)
9294 self .get_ds_from_api (certificate )
9395
9496 #@cache(namespace=CacheNamespace.EMBEDDED_INFO, cacheName=CacheName.ASSISTANT_DS, keyExpression="current_user.id")
95- async def get_ds_from_api (self , certificate : Optional [str ] = None ):
97+ def get_ds_from_api (self , certificate : Optional [str ] = None ):
9698 config : dict [any ] = json .loads (self .assistant .configuration )
9799 endpoint : str = config ['endpoint' ]
98100 certificateList : list [any ] = json .loads (certificate )
@@ -106,7 +108,7 @@ async def get_ds_from_api(self, certificate: Optional[str] = None):
106108
107109 res = requests .get (url = endpoint , headers = header , cookies = cookies , timeout = 10 )
108110 if res .status_code == 200 :
109- result_json : dict [any ] = json .loads (res .json () )
111+ result_json : dict [any ] = json .loads (res .text )
110112 if result_json .get ('code' ) == 0 :
111113 temp_list = result_json .get ('data' , [])
112114 self .ds_list = [
@@ -144,20 +146,35 @@ def get_db_schema(self, ds_id: int) -> str:
144146 def get_ds (self , ds_id : int ):
145147 if self .ds_list :
146148 for ds in self .ds_list :
147- if ds [ 'id' ] == ds_id :
149+ if ds . id == ds_id :
148150 return ds
149151 else :
150152 raise Exception ("Datasource list is not found." )
151153 raise Exception (f"Datasource with id { ds_id } not found." )
152- def get_ds_engine (self , ds_id : int ):
153- ds = self .get_ds (ds_id )
154- ds_type = ds .get ('type' ) if ds else None
155- if not ds_type :
156- raise Exception (f"Datasource with id { ds_id } not found or type is not defined." )
157- return ds_type
154+
158155
159156class AssistantOutDsFactory :
160157 @staticmethod
161158 def get_instance (assistant : AssistantModel , certificate : Optional [str ] = None ) -> AssistantOutDs :
162159 return AssistantOutDs (assistant , certificate )
160+
161+ def get_ds_engine (ds : AssistantOutDsSchema ) -> Engine :
162+ timeout : int = 30
163+ connect_args = {"connect_timeout" : timeout }
164+ conf = DatasourceConf (
165+ host = ds .host ,
166+ port = ds .port ,
167+ username = ds .user ,
168+ password = ds .password ,
169+ database = ds .dataBase ,
170+ driver = '' ,
171+ extraJdbc = ds .extraParams ,
172+ dbSchema = ds .schema or ''
173+ )
174+ from apps .db .db import get_uri_from_config
175+ uri = get_uri_from_config (ds .type , conf )
176+ if ds .type == "pg" and ds .schema :
177+ connect_args .update ({"options" : f"-c search_path={ ds .schema } " })
178+ engine = create_engine (uri , connect_args = connect_args , pool_timeout = timeout , pool_size = 20 , max_overflow = 10 )
179+ return engine
163180
0 commit comments