|
1 | | -import pyodbc |
2 | | -from apps.datasource.models.datasource import DatasourceConf |
| 1 | +from sqlalchemy import create_engine, text, Result |
| 2 | +from sqlalchemy.orm import sessionmaker |
| 3 | +from apps.datasource.models.datasource import DatasourceConf, CoreDatasource, TableSchema |
| 4 | +import urllib.parse |
| 5 | +from typing import Any |
3 | 6 |
|
4 | | -def get_connection(conf: DatasourceConf): |
5 | | - conn_str = ( |
6 | | - "DRIVER={MySQL ODBC 9.3 Unicode Driver};" # todo driver config |
7 | | - f"SERVER={conf.host};" |
8 | | - f"DATABASE={conf.database};" |
9 | | - f"UID={conf.username};" |
10 | | - f"PWD={conf.password};" |
11 | | - f"PORT={conf.port};" |
12 | | - ) |
13 | | - conn = None |
14 | | - try: |
15 | | - conn = pyodbc.connect(conn_str) |
16 | | - print("Connect Success") |
17 | | - return conn |
18 | | - except pyodbc.Error as e: |
19 | | - print(f"Connect Fail:{e}") |
20 | | - raise e |
21 | | - finally: |
22 | | - if conn is not None: |
23 | | - conn.close() |
| 7 | + |
| 8 | +def get_session(conf: DatasourceConf, ds: CoreDatasource): |
| 9 | + db_url: str |
| 10 | + if ds.type == "mysql": |
| 11 | + db_url = f"mysql+pymysql://{urllib.parse.quote(conf.username)}:{urllib.parse.quote(conf.password)}@{conf.host}:{conf.port}/{urllib.parse.quote(conf.database)}" |
| 12 | + else: |
| 13 | + raise 'The datasource type not support.' |
| 14 | + engine = create_engine(db_url) |
| 15 | + session_maker = sessionmaker(bind=engine) |
| 16 | + session = session_maker() |
| 17 | + return session |
| 18 | + |
| 19 | + |
| 20 | +def get_tables(conf: DatasourceConf, ds: CoreDatasource): |
| 21 | + session = get_session(conf, ds) |
| 22 | + result: Result[Any] |
| 23 | + if ds.type == "mysql": |
| 24 | + sql = f"""SELECT |
| 25 | + TABLE_NAME AS `Table Name`, |
| 26 | + TABLE_COMMENT AS `Table Comment` |
| 27 | + FROM |
| 28 | + information_schema.TABLES |
| 29 | + WHERE |
| 30 | + TABLE_SCHEMA = '{conf.database}';""" |
| 31 | + result = session.execute(text(sql)) |
| 32 | + |
| 33 | + res = result.fetchall() |
| 34 | + res_list = [TableSchema(*item) for item in res] |
| 35 | + return res_list |
| 36 | + |
| 37 | + |
| 38 | +def get_fields(conf: DatasourceConf, ds: CoreDatasource): |
| 39 | + session = get_session(conf, ds) |
| 40 | + result: Result[Any] |
| 41 | + pass |
0 commit comments