Skip to content

Commit 66bdae0

Browse files
committed
cleanup
1 parent 25cfc49 commit 66bdae0

File tree

2 files changed

+112
-94
lines changed

2 files changed

+112
-94
lines changed

dbos/conductor.go

Lines changed: 94 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,11 @@ func (c *Conductor) handleExecutorInfoRequest(data []byte, requestID string) err
333333
}
334334

335335
response := executorInfoResponse{
336-
baseMessage: baseMessage{
337-
Type: executorInfo,
338-
RequestID: requestID,
336+
baseResponse: baseResponse{
337+
baseMessage: baseMessage{
338+
Type: executorInfo,
339+
RequestID: requestID,
340+
},
339341
},
340342
ExecutorID: c.dbosCtx.GetExecutorID(),
341343
ApplicationVersion: c.dbosCtx.GetApplicationVersion(),
@@ -369,12 +371,14 @@ func (c *Conductor) handleRecoveryRequest(data []byte, requestID string) error {
369371
}
370372

371373
response := recoveryConductorResponse{
372-
baseMessage: baseMessage{
373-
Type: recoveryMessage,
374-
RequestID: requestID,
374+
baseResponse: baseResponse{
375+
baseMessage: baseMessage{
376+
Type: recoveryMessage,
377+
RequestID: requestID,
378+
},
379+
ErrorMessage: errorMsg,
375380
},
376-
Success: success,
377-
ErrorMessage: errorMsg,
381+
Success: success,
378382
}
379383

380384
return c.sendResponse(response, "recovery response")
@@ -403,12 +407,14 @@ func (c *Conductor) handleCancelWorkflowRequest(data []byte, requestID string) e
403407
}
404408

405409
response := cancelWorkflowConductorResponse{
406-
baseMessage: baseMessage{
407-
Type: cancelWorkflowMessage,
408-
RequestID: requestID,
410+
baseResponse: baseResponse{
411+
baseMessage: baseMessage{
412+
Type: cancelWorkflowMessage,
413+
RequestID: requestID,
414+
},
415+
ErrorMessage: errorMsg,
409416
},
410-
Success: success,
411-
ErrorMessage: errorMsg,
417+
Success: success,
412418
}
413419

414420
return c.sendResponse(response, "cancel workflow response")
@@ -438,12 +444,14 @@ func (c *Conductor) handleResumeWorkflowRequest(data []byte, requestID string) e
438444
}
439445

440446
response := resumeWorkflowConductorResponse{
441-
baseMessage: baseMessage{
442-
Type: resumeWorkflowMessage,
443-
RequestID: requestID,
447+
baseResponse: baseResponse{
448+
baseMessage: baseMessage{
449+
Type: resumeWorkflowMessage,
450+
RequestID: requestID,
451+
},
452+
ErrorMessage: errorMsg,
444453
},
445-
Success: success,
446-
ErrorMessage: errorMsg,
454+
Success: success,
447455
}
448456

449457
return c.sendResponse(response, "resume workflow response")
@@ -505,18 +513,19 @@ func (c *Conductor) handleRetentionRequest(data []byte, requestID string) error
505513
}
506514

507515
response := retentionConductorResponse{
508-
baseMessage: baseMessage{
509-
Type: retentionMessage,
510-
RequestID: requestID,
516+
baseResponse: baseResponse{
517+
baseMessage: baseMessage{
518+
Type: retentionMessage,
519+
RequestID: requestID,
520+
},
521+
ErrorMessage: errorMsg,
511522
},
512-
Success: success,
513-
ErrorMessage: errorMsg,
523+
Success: success,
514524
}
515525

516526
return c.sendResponse(response, "retention response")
517527
}
518528

519-
520529
// sendResponse sends a response to the conductor via websocket
521530
func (c *Conductor) sendResponse(response any, responseType string) error {
522531
if c.conn == nil {
@@ -585,12 +594,14 @@ func (c *Conductor) handleListWorkflowsRequest(data []byte, requestID string) er
585594
c.logger.Error("Failed to list workflows", "error", err)
586595
errorMsg := fmt.Sprintf("failed to list workflows: %v", err)
587596
response := listWorkflowsConductorResponse{
588-
baseMessage: baseMessage{
589-
Type: listWorkflowsMessage,
590-
RequestID: requestID,
597+
baseResponse: baseResponse{
598+
baseMessage: baseMessage{
599+
Type: listWorkflowsMessage,
600+
RequestID: requestID,
601+
},
602+
ErrorMessage: &errorMsg,
591603
},
592-
Output: []listWorkflowsConductorResponseBody{},
593-
ErrorMessage: &errorMsg,
604+
Output: []listWorkflowsConductorResponseBody{},
594605
}
595606
return c.sendResponse(response, "list workflows response")
596607
}
@@ -602,9 +613,11 @@ func (c *Conductor) handleListWorkflowsRequest(data []byte, requestID string) er
602613
}
603614

604615
response := listWorkflowsConductorResponse{
605-
baseMessage: baseMessage{
606-
Type: listWorkflowsMessage,
607-
RequestID: requestID,
616+
baseResponse: baseResponse{
617+
baseMessage: baseMessage{
618+
Type: listWorkflowsMessage,
619+
RequestID: requestID,
620+
},
608621
},
609622
Output: formattedWorkflows,
610623
}
@@ -666,12 +679,14 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
666679
c.logger.Error("Failed to list queued workflows", "error", err)
667680
errorMsg := fmt.Sprintf("failed to list queued workflows: %v", err)
668681
response := listWorkflowsConductorResponse{
669-
baseMessage: baseMessage{
670-
Type: listQueuedWorkflowsMessage,
671-
RequestID: requestID,
682+
baseResponse: baseResponse{
683+
baseMessage: baseMessage{
684+
Type: listQueuedWorkflowsMessage,
685+
RequestID: requestID,
686+
},
687+
ErrorMessage: &errorMsg,
672688
},
673-
Output: []listWorkflowsConductorResponseBody{},
674-
ErrorMessage: &errorMsg,
689+
Output: []listWorkflowsConductorResponseBody{},
675690
}
676691
return c.sendResponse(response, "list workflows response")
677692
}
@@ -693,9 +708,11 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
693708
}
694709

695710
response := listWorkflowsConductorResponse{
696-
baseMessage: baseMessage{
697-
Type: listWorkflowsMessage,
698-
RequestID: requestID,
711+
baseResponse: baseResponse{
712+
baseMessage: baseMessage{
713+
Type: listWorkflowsMessage,
714+
RequestID: requestID,
715+
},
699716
},
700717
Output: formattedWorkflows,
701718
}
@@ -718,12 +735,14 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
718735
c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err)
719736
errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err)
720737
response := listStepsConductorResponse{
721-
baseMessage: baseMessage{
722-
Type: listStepsMessage,
723-
RequestID: requestID,
738+
baseResponse: baseResponse{
739+
baseMessage: baseMessage{
740+
Type: listStepsMessage,
741+
RequestID: requestID,
742+
},
743+
ErrorMessage: &errorMsg,
724744
},
725-
Output: nil,
726-
ErrorMessage: &errorMsg,
745+
Output: nil,
727746
}
728747
return c.sendResponse(response, "list steps response")
729748
}
@@ -739,17 +758,18 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
739758
}
740759

741760
response := listStepsConductorResponse{
742-
baseMessage: baseMessage{
743-
Type: listStepsMessage,
744-
RequestID: requestID,
761+
baseResponse: baseResponse{
762+
baseMessage: baseMessage{
763+
Type: listStepsMessage,
764+
RequestID: requestID,
765+
},
745766
},
746767
Output: formattedSteps,
747768
}
748769

749770
return c.sendResponse(response, "list steps response")
750771
}
751772

752-
753773
// handleGetWorkflowRequest handles get workflow requests from the conductor
754774
func (c *Conductor) handleGetWorkflowRequest(data []byte, requestID string) error {
755775
var req getWorkflowConductorRequest
@@ -765,12 +785,14 @@ func (c *Conductor) handleGetWorkflowRequest(data []byte, requestID string) erro
765785
c.logger.Error("Failed to get workflow", "workflow_id", req.WorkflowID, "error", err)
766786
errorMsg := fmt.Sprintf("failed to get workflow: %v", err)
767787
response := getWorkflowConductorResponse{
768-
baseMessage: baseMessage{
769-
Type: getWorkflowMessage,
770-
RequestID: requestID,
788+
baseResponse: baseResponse{
789+
baseMessage: baseMessage{
790+
Type: getWorkflowMessage,
791+
RequestID: requestID,
792+
},
793+
ErrorMessage: &errorMsg,
771794
},
772-
Output: nil,
773-
ErrorMessage: &errorMsg,
795+
Output: nil,
774796
}
775797
return c.sendResponse(response, "get workflow response")
776798
}
@@ -783,9 +805,11 @@ func (c *Conductor) handleGetWorkflowRequest(data []byte, requestID string) erro
783805
}
784806

785807
response := getWorkflowConductorResponse{
786-
baseMessage: baseMessage{
787-
Type: getWorkflowMessage,
788-
RequestID: requestID,
808+
baseResponse: baseResponse{
809+
baseMessage: baseMessage{
810+
Type: getWorkflowMessage,
811+
RequestID: requestID,
812+
},
789813
},
790814
Output: formattedWorkflow,
791815
}
@@ -832,12 +856,14 @@ func (c *Conductor) handleForkWorkflowRequest(data []byte, requestID string) err
832856
}
833857

834858
response := forkWorkflowConductorResponse{
835-
baseMessage: baseMessage{
836-
Type: forkWorkflowMessage,
837-
RequestID: requestID,
859+
baseResponse: baseResponse{
860+
baseMessage: baseMessage{
861+
Type: forkWorkflowMessage,
862+
RequestID: requestID,
863+
},
864+
ErrorMessage: errorMsg,
838865
},
839866
NewWorkflowID: newWorkflowID,
840-
ErrorMessage: errorMsg,
841867
}
842868

843869
return c.sendResponse(response, "fork workflow response")
@@ -870,12 +896,14 @@ func (c *Conductor) handleExistPendingWorkflowsRequest(data []byte, requestID st
870896
}
871897

872898
response := existPendingWorkflowsConductorResponse{
873-
baseMessage: baseMessage{
874-
Type: existPendingWorkflowsMessage,
875-
RequestID: requestID,
899+
baseResponse: baseResponse{
900+
baseMessage: baseMessage{
901+
Type: existPendingWorkflowsMessage,
902+
RequestID: requestID,
903+
},
904+
ErrorMessage: errorMsg,
876905
},
877-
Exist: len(workflows) > 0,
878-
ErrorMessage: errorMsg,
906+
Exist: len(workflows) > 0,
879907
}
880908

881909
return c.sendResponse(response, "exist pending workflows response")

0 commit comments

Comments
 (0)