Skip to content

Conversation

@littleniannian
Copy link
Collaborator

@littleniannian littleniannian commented Nov 28, 2025

User description

关联的 issue

actiontech/sqle#3133

描述你的变更

  • 增加odc对接sqle的审核功能

确认项(pr提交后操作)

Tip

请在指定复审人之前,确认并完成以下事项,完成后✅


  • 我已完成自测
  • 我已记录完整日志方便进行诊断
  • 我已在关联的issue里补充了实现方案
  • 我已在关联的issue里补充了测试影响面
  • 我已确认了变更的兼容性,如果不兼容则在issue里标记 not_compatible
  • 我已确认了是否要更新文档,如果要更新则在issue里标记 need_update_doc


Description

  • 为 SQL 工作台新增审核中间件

  • 扩展 streamExecute 请求及 sid 解析

  • 集成 SQLE 审核接口及审批逻辑

  • 增加操作日志记录与响应拦截


Diagram Walkthrough

flowchart LR
  A["新增审核中间件"] --> B["解析请求体中SQL/sid"]
  B --> C["调用SQLE审核接口"]
  C --> D["判断是否需要审批"]
  D --> E["记录审核拦截日志"]
  D --> F["执行真实SQL请求"]
Loading

File Walkthrough

Relevant files
Enhancement
router.go
调用审核中间件                                                                                                   

internal/apiserver/service/router.go

  • 添加 SQL 审核中间件调用
+1/-0     
model.go
增加审核结果消息字段                                                                                             

internal/dms/storage/model/model.go

  • 在 AuditResult 结构中新增 Message 字段
+1/-0     
graphql.go
修改审核响应类型名称                                                                                             

internal/pkg/cloudbeaver/graphql.go

  • 重命名 auditSQLReply 为 AuditSQLReply
  • 修改 POST 请求中的反馈类型引用
+2/-2     
sql_workbench_service.go
增加审核中间件及审批日志功能                                                                                     

internal/sql_workbench/service/sql_workbench_service.go

  • 新增 SQL 工作台审核中间件及审批逻辑
  • 添加解析请求体、sid 解析函数
  • 增加响应拦截器与 gzip 处理逻辑
  • 整合 SQLE 审核结果到响应数据
+762/-0 

@github-actions
Copy link

github-actions bot commented Nov 28, 2025

PR Reviewer Guide 🔍

(Review updated until commit ff96645)

⏱️ Estimated effort to review: 5 🔵🔵🔵🔵🔵
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

错误处理

在 AuditMiddleware 及相关解析和调用 SQLE 接口的逻辑中,遇到错误时直接返回详细错误信息,建议检查错误返回是否符合用户接口的要求,避免暴露内部实现细节,同时考虑更友好的错误响应逻辑。

// AuditMiddleware 拦截工作台odc请求进行加工
func (sqlWorkbenchService *SqlWorkbenchService) AuditMiddleware() echo.MiddlewareFunc {
	return func(next echo.HandlerFunc) echo.HandlerFunc {
		return func(c echo.Context) error {
			// 只拦截包含 /streamExecute 的请求
			if !strings.Contains(c.Request().URL.Path, "/streamExecute") {
				return next(c)
			}

			// 读取请求体
			bodyBytes, err := io.ReadAll(c.Request().Body)
			if err != nil {
				return fmt.Errorf("failed to read request body: %w", err)
			}
			// 恢复请求体,供后续处理使用
			c.Request().Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

			// 解析请求体获取 SQL 和 datasource ID
			sql, datasourceID, err := sqlWorkbenchService.parseStreamExecuteRequest(bodyBytes)
			if err != nil {
				return fmt.Errorf("failed to parse streamExecute request, skipping audit: %v", err)
			}

			if sql == "" || datasourceID == "" {
				return fmt.Errorf("SQL or datasource ID is empty, skipping audit")
			}

			// 获取当前用户 ID
			dmsUserId, err := sqlWorkbenchService.getDMSUserIdFromRequest(c)
			if err != nil {
				return fmt.Errorf("failed to get DMS user ID: %v", err)
			}

			// 从缓存表获取 dms_db_service_id
			dmsDBServiceID, err := sqlWorkbenchService.getDMSDBServiceIDFromCache(c.Request().Context(), datasourceID, dmsUserId)
			if err != nil {
				return fmt.Errorf("failed to get dms_db_service_id from cache: %v", err)
			}

			if dmsDBServiceID == "" {
				return fmt.Errorf("dms_db_service_id not found in cache for datasource: %s", datasourceID)
			}

			// 获取 DBService 信息
			dbService, err := sqlWorkbenchService.dbServiceUsecase.GetDBService(c.Request().Context(), dmsDBServiceID)
			if err != nil {
				return fmt.Errorf("failed to get DBService: %v", err)
			}

			// 检查是否启用 SQL 审核
			if !sqlWorkbenchService.isEnableSQLAudit(dbService) {
				return fmt.Errorf("SQL audit is not enabled for DBService: %s", dmsDBServiceID)
			}

			// 调用 SQLE 审核接口
			auditResult, err := sqlWorkbenchService.callSQLEAudit(c.Request().Context(), sql, dbService)
			if err != nil {
				return fmt.Errorf("call SQLE audit failed: %v", err)
			}

			// 拦截响应并添加审核结果
			return sqlWorkbenchService.interceptAndAddAuditResult(c, next, dmsUserId, auditResult, dbService)
		}
	}
}
代码可读性

部分函数(如 convertSQLEAuditToViolatedRules 和 mapAuditLevelToNumber)中对审核结果转换和级别映射采用了硬编码方式,建议抽取公共逻辑进行复用和单元测试验证,确保后续维护更加清晰。

func (sqlWorkbenchService *SqlWorkbenchService) convertSQLEAuditToViolatedRules(auditResult *cloudbeaver.AuditSQLResV2) []StreamExecuteRule {
	var violatedRules []StreamExecuteRule

	// 将 SQLE 的 audit_result 转换为 violatedRules 格式
	for _, auditItem := range auditResult.AuditResult {
		// 映射 level 字符串到数字
		levelNum := sqlWorkbenchService.mapAuditLevelToNumber(auditItem.Level)

		violatedRule := StreamExecuteRule{
			AppliedDialectTypes: nil,
			CreateTime:          nil,
			Enabled:             nil,
			ID:                  nil,
			Level:               levelNum,
			Metadata:            nil,
			OrganizationID:      nil,
			Properties:          nil,
			RulesetID:           nil,
			UpdateTime:          nil,
			Violation: StreamExecuteViolation{
				Level:            levelNum,
				LocalizedMessage: auditItem.Message,
				Offset:           0, // SQLE 审核结果可能没有 offset 信息
				Start:            0,
				Stop:             0,
				Text:             auditResult.ExecSQL,
			},
		}
		violatedRules = append(violatedRules, violatedRule)
	}

	return violatedRules
}

// mapAuditLevelToNumber 将审核级别字符串映射到数字
// normal=0, notice=3, warn=1, error=2
func (sqlWorkbenchService *SqlWorkbenchService) mapAuditLevelToNumber(level string) int {
	switch strings.ToLower(level) {
	case "normal":
		return 0
	case "notice":
		return 3
	case "warn":
		return 1
	case "error":
		return 2
	default:
		return 0 // 默认为 normal
	}
}
数据源匹配

getDMSDBServiceIDFromCache 中在处理 datasourceID 转换及匹配时,如果转换失败直接返回第一个匹配的数据源,建议确认该逻辑是否满足产品业务场景,避免可能的匹配误差。

// getDMSDBServiceIDFromCache 从 sql_workbench_datasource_caches 表获取 dms_db_service_id
func (sqlWorkbenchService *SqlWorkbenchService) getDMSDBServiceIDFromCache(ctx context.Context, datasourceID, dmsUserID string) (string, error) {
	// 尝试将 datasourceID 转换为 int64(ODC 的 datasource ID 通常是数字)
	var sqlWorkbenchDatasourceID int64
	if _, err := fmt.Sscanf(datasourceID, "%d", &sqlWorkbenchDatasourceID); err != nil {
		// 如果转换失败,尝试直接使用字符串作为 datasource ID
		sqlWorkbenchService.log.Debugf("Failed to convert datasourceID to int64, trying to find by string: %s", datasourceID)
	}

	// 从缓存表中查找,需要根据 sql_workbench_datasource_id 和 dms_user_id 查找
	// 由于缓存表可能没有直接存储 sql_workbench_datasource_id,我们需要通过其他方式查找
	// 这里先尝试通过用户 ID 获取所有数据源,然后匹配
	datasources, err := sqlWorkbenchService.sqlWorkbenchDatasourceRepo.GetSqlWorkbenchDatasourcesByUserID(ctx, dmsUserID)
	if err != nil {
		return "", fmt.Errorf("failed to get datasources by user id: %v", err)
	}

	// 如果 datasourceID 是数字,尝试匹配 SqlWorkbenchDatasourceID
	if sqlWorkbenchDatasourceID > 0 {
		for _, ds := range datasources {
			if ds.SqlWorkbenchDatasourceID == sqlWorkbenchDatasourceID {
				return ds.DMSDBServiceID, nil
			}
		}
	}

	// 如果找不到,返回第一个匹配的数据源(临时方案,后续可能需要更精确的匹配逻辑)
	if len(datasources) > 0 {
		// 这里可以根据实际业务逻辑选择合适的数据源
		// 暂时返回第一个数据源的 dms_db_service_id
		return datasources[0].DMSDBServiceID, nil
	}

	return "", fmt.Errorf("no datasource found for datasourceID: %s, userID: %s", datasourceID, dmsUserID)
}

@github-actions
Copy link

Failed to generate code suggestions for PR

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

Persistent review updated to latest commit 004b1cb

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

PR Code Suggestions ✨

Latest suggestions up to ff96645

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
增加 nil 检查

在处理审核结果前,建议增加对 auditResult.Data 是否为 nil 的检查,以防止解引用 nil 指针从而触发 panic。确保在进入 for 循环之前验证
auditResult.Data 是否为 nil,如遇到 nil 时返回适当的错误提示。

internal/sql_workbench/service/sql_workbench_service.go [1265-1270]

 func (sqlWorkbenchService *SqlWorkbenchService) buildAuditResponseWithoutExecution(c echo.Context, userId string, auditResult *cloudbeaver.AuditSQLReply, dbService *biz.DBService) error {
+	// 检查 auditResult.Data 是否为 nil,防止 nil 指针异常
+	if auditResult.Data == nil {
+		return fmt.Errorf("audit result data is nil")
+	}
 	// 构造 SQL 条目列表
 	sqlItems := make([]StreamExecuteSQLItem, 0, len(auditResult.Data.SQLResults))
 	for _, sqlResult := range auditResult.Data.SQLResults {
 		// 转换审核结果为 violatedRules 格式
 		violatedRules := sqlWorkbenchService.convertSQLEAuditToViolatedRules(&sqlResult)
 		...
 	}
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly adds a defensive check for nil on auditResult.Data to avoid potential panic when dereferencing, which is important for robustness. It directly addresses a possible runtime error in the buildAuditResponseWithoutExecution function.

Medium

Previous suggestions

Suggestions up to commit 83b74ee
CategorySuggestion                                                                                                                                    Impact
Possible issue
避免重复写响应头

建议在延迟函数中写入响应头之前增加检查,确保不重复调用 WriteHeader,从而避免多次写响应头可能引发
panic。可以在自定义响应拦截器中增加一个标志位来记录是否已经写入响应头。

internal/sql_workbench/service/sql_workbench_service.go [1415-1490]

 defer func() {
-        // 在 defer 中处理响应
-        if srw.status != 0 {
-            srw.original.WriteHeader(srw.status)
+        // 在 defer 中处理响应,确保只写一次头部
+        if !srw.headerWritten {
+            if srw.status != 0 {
+                srw.original.WriteHeader(srw.status)
+                srw.headerWritten = true
+            } else {
+                srw.original.WriteHeader(http.StatusOK)
+                srw.headerWritten = true
+            }
         }
-        ...
-        srw.original.WriteHeader(statusCode)
         ...
 }()
Suggestion importance[1-10]: 5

__

Why: 该建议针对在 defer 函数中重复写响应头可能导致 panic 的问题,虽然引入标志位可以提高安全性,但需要整体重构响应拦截器,影响有限,因此评分为 5.

Low
Suggestions up to commit 1e9542d
CategorySuggestion                                                                                                                                    Impact
Possible issue
限制请求体大小

建议在读取请求体之前检查请求体的大小,防止超大请求加载到内存中导致内存耗尽或引发 panic 问题。可以在读取前使用限制器或者检测 Content-Length。

internal/sql_workbench/service/sql_workbench_service.go [1009-1013]

+// 检查请求体大小(示例:限制最大读取 10MB)
+const maxBodySize = 10 * 1024 * 1024
+if c.Request().ContentLength > maxBodySize {
+    return fmt.Errorf("request body too large")
+}
 // 读取请求体
 bodyBytes, err := io.ReadAll(c.Request().Body)
 if err != nil {
     return fmt.Errorf("failed to read request body: %w", err)
 }
Suggestion importance[1-10]: 6

__

Why: 该建议在调用 io.ReadAll 之前添加了请求体大小检查,有助于防止内存耗尽问题,虽然不是修复当前代码的重大错误,但能提高健壮性。

Low
避免头部重复写入

建议在调用 WriteHeader 前先判断是否已经写入响应头,以避免重复写入导致 panic。确保只调用一次
WriteHeader,并在写入响应之前检测状态码是否已发送。

internal/sql_workbench/service/sql_workbench_service.go [1307-1311]

 defer func() {
-    if srw.status != 0 {
+    if srw.status != 0 && !headerWritten(srw.original) {
         srw.original.WriteHeader(srw.status)
     }
     ...
 }()
 
+// 示例辅助函数,用于判断响应头是否已写入
+func headerWritten(w http.ResponseWriter) bool {
+    if hw, ok := w.(interface{ Written() bool }); ok {
+        return hw.Written()
+    }
+    return false
+}
+
Suggestion importance[1-10]: 5

__

Why: 建议在调用 WriteHeader 前检测响应头是否已写入,以避免重复写入导致 panic,属于一项稳健性防御性改进,对代码整体影响有限。

Low
Suggestions up to commit f31fa0f
CategorySuggestion                                                                                                                                    Impact
Possible issue
修正审核等级映射

建议修正 mapAuditLevelToNumber 函数中审核等级的映射逻辑,以使返回值与注释中描述的 (normal=0, notice=1, warn=2,
error=3) 一致。此错误会导致审核审批的逻辑依据错误的等级判断,从而引发潜在的业务逻辑问题。

internal/sql_workbench/service/sql_workbench_service.go [1546-1559]

 func (sqlWorkbenchService *SqlWorkbenchService) mapAuditLevelToNumber(level string) int {
 	switch strings.ToLower(level) {
 	case "normal":
 		return 0
 	case "notice":
+		return 1
+	case "warn":
+		return 2
+	case "error":
 		return 3
-	case "warn":
-		return 1
-	case "error":
-		return 2
 	default:
-		return 0 // 默认为 notice
+		return 1 // 默认为 notice
 	}
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a mismatch in the mapping logic of mapAuditLevelToNumber, aligning the return values with the documented mapping (normal=0, notice=1, warn=2, error=3), which is crucial for proper audit logic.

Medium
Suggestions up to commit 004b1cb
CategorySuggestion                                                                                                                                    Impact
Possible issue
改进错误处理逻辑

建议在读取请求体失败时立即返回错误响应,而不是直接调用 next(c),以防绕过必要的审核流程。这样可以避免因请求体读取失败而导致潜在的安全审计空白或错误记录问题。

internal/sql_workbench/service/sql_workbench_service.go [1008-1012]

 bodyBytes, err := io.ReadAll(c.Request().Body)
 if err != nil {
 	sqlWorkbenchService.log.Errorf("Failed to read request body: %v", err)
-	return next(c)
+	return c.JSON(http.StatusBadRequest, map[string]string{"error": "读取请求体失败"})
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion improves error handling by returning an error response instead of proceeding with next(c), which could prevent bypassing audit checks; however, this change might alter the intended middleware flow.

Low
General
加强 SQL 匹配准确性

建议对 SQL 进行更严格的一致性规范化处理,比如统一转换为小写再比较,防止因大小写或额外空格导致匹配失败。这可以确保审核结果能准确对应到实际执行的 SQL 条目上。

internal/sql_workbench/service/sql_workbench_service.go [1456-1458]

-normalizedSQL := strings.TrimSpace(strings.TrimSuffix(sqlItem.SQLTuple.OriginalSQL, ";"))
+normalizedSQL := strings.ToLower(strings.TrimSpace(strings.TrimSuffix(sqlItem.SQLTuple.OriginalSQL, ";")))
 if auditResult, found := sqlAuditMap[normalizedSQL]; found {
 	matchedAuditResult = auditResult
 }
Suggestion importance[1-10]: 5

__

Why: Adding a conversion to lower case enhances consistency in SQL matching, reducing potential mismatches due to case differences; the improvement is correct but has only a minor impact.

Low

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

Persistent review updated to latest commit f31fa0f

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

Persistent review updated to latest commit 1e9542d

@github-actions
Copy link

github-actions bot commented Dec 4, 2025

Persistent review updated to latest commit 83b74ee

@github-actions
Copy link

github-actions bot commented Dec 5, 2025

Persistent review updated to latest commit ff96645

@BugsGuru BugsGuru merged commit 9d6c9f3 into main Dec 5, 2025
1 check passed
@BugsGuru BugsGuru deleted the feat_sqlworkbench_audit branch December 5, 2025 05:56
BugsGuru pushed a commit that referenced this pull request Jan 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants