@@ -14,16 +14,14 @@ import (
1414 "strings"
1515 "sync"
1616
17+ "github.com/99designs/gqlgen/graphql"
18+ "github.com/99designs/gqlgen/graphql/executor"
1719 "github.com/actiontech/dms/internal/dms/pkg/constant"
1820 "github.com/actiontech/dms/internal/pkg/cloudbeaver"
1921 "github.com/actiontech/dms/internal/pkg/cloudbeaver/model"
2022 "github.com/actiontech/dms/internal/pkg/cloudbeaver/resolver"
21-
2223 "github.com/actiontech/dms/pkg/dms-common/api/jwt"
2324 "github.com/actiontech/dms/pkg/dms-common/pkg/aes"
24-
25- "github.com/99designs/gqlgen/graphql"
26- "github.com/99designs/gqlgen/graphql/executor"
2725 utilLog "github.com/actiontech/dms/pkg/dms-common/pkg/log"
2826 "github.com/labstack/echo/v4"
2927)
@@ -334,19 +332,13 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
334332 ctx := graphql .StartOperationTrace (context .Background ())
335333
336334 var dbService * DBService
337-
338335 if params .OperationName == "asyncSqlExecuteQuery" {
339336 dbService , err = cu .getDbService (c .Request ().Context (), params )
340337 if err != nil {
341338 cu .log .Error (err )
342339 return err
343340 }
344341
345- err = cu .SaveCbOpLog (c , dbService , params , next )
346- if err != nil {
347- return err
348- }
349-
350342 if ! cu .isEnableSQLAudit (dbService ) {
351343 cloudbeaverResBuf := new (bytes.Buffer )
352344 mw := io .MultiWriter (c .Response ().Writer , cloudbeaverResBuf )
@@ -378,6 +370,70 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
378370 ctx = context .WithValue (ctx , cloudbeaver .SQLEDirectAudit , directAuditReq )
379371 }
380372
373+ if params .OperationName == "updateResultsDataBatch" {
374+ cloudbeaverResBuf := new (bytes.Buffer )
375+ mw := io .MultiWriter (c .Response ().Writer , cloudbeaverResBuf )
376+ writer := & cloudbeaverResponseWriter {Writer : mw , ResponseWriter : c .Response ().Writer }
377+ c .Response ().Writer = writer
378+
379+ if err = next (c ); err != nil {
380+ return err
381+ }
382+
383+ if err := cu .SaveUiOp (c , cloudbeaverResBuf , params ); err != nil {
384+ cu .log .Errorf ("save ui op err: %v" , err )
385+ return nil
386+ }
387+
388+ return nil
389+ }
390+
391+ if params .OperationName == "getAsyncTaskInfo" {
392+ cloudbeaverResBuf := new (bytes.Buffer )
393+ mw := io .MultiWriter (c .Response ().Writer , cloudbeaverResBuf )
394+ writer := & cloudbeaverResponseWriter {Writer : mw , ResponseWriter : c .Response ().Writer }
395+ c .Response ().Writer = writer
396+
397+ if err = next (c ); err != nil {
398+ return err
399+ }
400+
401+ cbUid , exist := taskIDAssocUid .Load (params .Variables ["taskId" ])
402+ if ! exist {
403+ return nil
404+ }
405+ cbUidStr , ok := cbUid .(string )
406+ if ! ok {
407+ return nil
408+ }
409+
410+ operationLog , err := cu .cbOperationLogUsecase .GetCbOperationLogByID (ctx , cbUidStr )
411+ if err != nil {
412+ cu .log .Errorf ("get cb operation log by id %s failed: %v" , cbUidStr , err )
413+ return nil
414+ } else {
415+ var taskInfo TaskInfo
416+ if err := json .Unmarshal (cloudbeaverResBuf .Bytes (), & taskInfo ); err != nil {
417+ cu .log .Errorf ("extract task id err: %v" , err )
418+ return nil
419+ }
420+
421+ task := taskInfo .Data .TaskInfo
422+ if task .Running == true || task .Error == nil {
423+ return nil
424+ }
425+
426+ operationLog .ExecResult = * task .Error .Message
427+ err := cu .cbOperationLogUsecase .UpdateCbOperationLog (ctx , operationLog )
428+ if err != nil {
429+ cu .log .Error (err )
430+ return nil
431+ }
432+ }
433+
434+ return nil
435+ }
436+
381437 if params .OperationName == "getSqlExecuteTaskResults" {
382438 cloudbeaverResBuf := new (bytes.Buffer )
383439 mw := io .MultiWriter (c .Response ().Writer , cloudbeaverResBuf )
@@ -418,9 +474,14 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
418474 cloudbeaverNext = func (c echo.Context ) ([]byte , error ) {
419475 resp , ok = c .Get (cloudbeaver .AuditResultKey ).(cloudbeaver.AuditResults )
420476 if ok {
421- cu .UpdateCbOp (params , ctx , resp )
422477 if ! resp .IsSuccess {
478+ cu .SaveCbOperationLogWithoutNext (c , dbService , params , resp )
423479 return nil , c .JSON (http .StatusOK , convertToResp (resp ))
480+ } else {
481+ err = cu .SaveCbOpLog (c , dbService , params , resp , next )
482+ if err != nil {
483+ return nil , nil
484+ }
424485 }
425486 }
426487
@@ -446,9 +507,16 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
446507 } else {
447508 cloudbeaverNext = func (c echo.Context ) ([]byte , error ) {
448509 resp , ok = c .Get (cloudbeaver .AuditResultKey ).(cloudbeaver.AuditResults )
449- cu .UpdateCbOp (params , ctx , resp )
450- if ! resp .IsSuccess {
451- return nil , c .JSON (http .StatusOK , convertToResp (resp ))
510+ if ok {
511+ if ! resp .IsSuccess {
512+ cu .SaveCbOperationLogWithoutNext (c , dbService , params , resp )
513+ return nil , c .JSON (http .StatusOK , convertToResp (resp ))
514+ } else {
515+ err = cu .SaveCbOpLog (c , dbService , params , resp , next )
516+ if err != nil {
517+ return nil , nil
518+ }
519+ }
452520 }
453521
454522 resWrite = & responseProcessWriter {tmp : & bytes.Buffer {}, ResponseWriter : c .Response ().Writer }
0 commit comments