|
1 | 1 | import asyncio |
2 | 2 | import hashlib |
| 3 | +import io |
3 | 4 | import os |
4 | 5 | import traceback |
5 | 6 | import uuid |
6 | 7 | from io import StringIO |
7 | 8 | from typing import List |
| 9 | +from urllib.parse import quote |
8 | 10 |
|
9 | 11 | import orjson |
10 | 12 | import pandas as pd |
11 | 13 | from fastapi import APIRouter, File, UploadFile, HTTPException, Path |
| 14 | +from fastapi.responses import StreamingResponse |
| 15 | +from sqlalchemy import and_ |
12 | 16 |
|
13 | 17 | from apps.db.db import get_schema |
14 | 18 | from apps.db.engine import get_engine_conn |
@@ -81,7 +85,6 @@ def inner(): |
81 | 85 | await asyncio.to_thread(inner) |
82 | 86 |
|
83 | 87 |
|
84 | | - |
85 | 88 | @router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update") |
86 | 89 | @require_permissions(permission=SqlbotPermission(type='ds', keyExpression="ds.id")) |
87 | 90 | async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource): |
@@ -360,3 +363,132 @@ def insert_pg(df, tableName, engine): |
360 | 363 | finally: |
361 | 364 | cursor.close() |
362 | 365 | conn.close() |
| 366 | + |
| 367 | + |
| 368 | +t_sheet = "数据表列表" |
| 369 | +t_n_col = "表名" |
| 370 | +t_c_col = "表备注" |
| 371 | +f_n_col = "字段名" |
| 372 | +f_c_col = "字段备注" |
| 373 | + |
| 374 | + |
| 375 | +@router.get("/exportDsSchema/{id}") |
| 376 | +async def export_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")): |
| 377 | + # { |
| 378 | + # 'sheet':'', sheet name |
| 379 | + # 'c1_h':'', column1 column name |
| 380 | + # 'c2_h':'', column2 column name |
| 381 | + # 'c1':[], column1 data |
| 382 | + # 'c2':[], column2 data |
| 383 | + # } |
| 384 | + def inner(): |
| 385 | + if id == 0: # download template |
| 386 | + file_name = '批量上传备注' |
| 387 | + df_list = [ |
| 388 | + {'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': ["user", "score"], |
| 389 | + 'c2': ["用来存放用户信息的数据表", "用来存放用户课程信息的数据表"]}, |
| 390 | + {'sheet': '数据表1', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["id", "name"], |
| 391 | + 'c2': ["用户id", "用户姓名"]}, |
| 392 | + {'sheet': '数据表2', 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': ["course", "user_id", "score"], |
| 393 | + 'c2': ["课程名称", "用户ID", "课程得分"]}, |
| 394 | + ] |
| 395 | + else: |
| 396 | + ds = session.query(CoreDatasource).filter(CoreDatasource.id == id).first() |
| 397 | + file_name = ds.name |
| 398 | + tables = session.query(CoreTable).filter(CoreTable.ds_id == id).all() |
| 399 | + if len(tables) == 0: |
| 400 | + raise HTTPException(400, "No tables") |
| 401 | + |
| 402 | + df_list = [] |
| 403 | + df1 = {'sheet': t_sheet, 'c1_h': t_n_col, 'c2_h': t_c_col, 'c1': [], 'c2': []} |
| 404 | + df_list.append(df1) |
| 405 | + for table in tables: |
| 406 | + df1['c1'].append(table.table_name) |
| 407 | + df1['c2'].append(table.custom_comment) |
| 408 | + |
| 409 | + fields = session.query(CoreField).filter(CoreField.table_id == table.id).all() |
| 410 | + df_fields = {'sheet': table.table_name, 'c1_h': f_n_col, 'c2_h': f_c_col, 'c1': [], 'c2': []} |
| 411 | + for field in fields: |
| 412 | + df_fields['c1'].append(field.field_name) |
| 413 | + df_fields['c2'].append(field.custom_comment) |
| 414 | + df_list.append(df_fields) |
| 415 | + |
| 416 | + # build dataframe and export |
| 417 | + output = io.BytesIO() |
| 418 | + |
| 419 | + with pd.ExcelWriter(output, engine='xlsxwriter') as writer: |
| 420 | + for df in df_list: |
| 421 | + pd.DataFrame({df['c1_h']: df['c1'], df['c2_h']: df['c2']}).to_excel(writer, sheet_name=df['sheet'], |
| 422 | + index=False) |
| 423 | + |
| 424 | + output.seek(0) |
| 425 | + |
| 426 | + filename = f'{file_name}.xlsx' |
| 427 | + encoded_filename = quote(filename) |
| 428 | + return io.BytesIO(output.getvalue()) |
| 429 | + |
| 430 | + # headers = { |
| 431 | + # 'Content-Disposition': f"attachment; filename*=UTF-8''{encoded_filename}" |
| 432 | + # } |
| 433 | + |
| 434 | + result = await asyncio.to_thread(inner) |
| 435 | + return StreamingResponse( |
| 436 | + result, |
| 437 | + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" |
| 438 | + ) |
| 439 | + |
| 440 | + |
| 441 | +@router.post("/uploadDsSchema/{id}") |
| 442 | +async def upload_ds_schema(session: SessionDep, id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id"), |
| 443 | + file: UploadFile = File(...)): |
| 444 | + ALLOWED_EXTENSIONS = {"xlsx", "xls"} |
| 445 | + if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): |
| 446 | + raise HTTPException(400, "Only support .xlsx/.xls") |
| 447 | + |
| 448 | + try: |
| 449 | + contents = await file.read() |
| 450 | + excel_file = io.BytesIO(contents) |
| 451 | + |
| 452 | + sheet_names = pd.ExcelFile(excel_file, engine="openpyxl").sheet_names |
| 453 | + |
| 454 | + excel_file.seek(0) |
| 455 | + |
| 456 | + field_sheets = [] |
| 457 | + table_sheet = None # [] |
| 458 | + for sheet in sheet_names: |
| 459 | + df = pd.read_excel(excel_file, sheet_name=sheet, engine="openpyxl") |
| 460 | + if sheet == t_sheet: |
| 461 | + table_sheet = df.where(pd.notnull(df), None).to_dict(orient="records") |
| 462 | + else: |
| 463 | + field_sheets.append( |
| 464 | + {'sheet_name': sheet, 'data': df.where(pd.notnull(df), None).to_dict(orient="records")}) |
| 465 | + |
| 466 | + # print(field_sheets) |
| 467 | + |
| 468 | + # get data and update |
| 469 | + # update table comment |
| 470 | + if table_sheet and len(table_sheet) > 0: |
| 471 | + for table in table_sheet: |
| 472 | + session.query(CoreTable).filter( |
| 473 | + and_(CoreTable.ds_id == id, CoreTable.table_name == table[t_n_col])).update( |
| 474 | + {'custom_comment': table[t_c_col]}) |
| 475 | + |
| 476 | + # update field comment |
| 477 | + if field_sheets and len(field_sheets) > 0: |
| 478 | + for fields in field_sheets: |
| 479 | + if len(fields['data']) > 0: |
| 480 | + # get table id |
| 481 | + table = session.query(CoreTable).filter( |
| 482 | + and_(CoreTable.ds_id == id, CoreTable.table_name == fields['sheet_name'])).first() |
| 483 | + if table: |
| 484 | + for field in fields['data']: |
| 485 | + session.query(CoreField).filter( |
| 486 | + and_(CoreField.ds_id == id, |
| 487 | + CoreField.table_id == table.id, |
| 488 | + CoreField.field_name == field[f_n_col])).update( |
| 489 | + {'custom_comment': field[f_c_col]}) |
| 490 | + session.commit() |
| 491 | + |
| 492 | + return True |
| 493 | + except Exception as e: |
| 494 | + raise HTTPException(status_code=500, detail=f"解析 Excel 失败: {str(e)}") |
0 commit comments