Skip to content

Commit ee3c212

Browse files
committed
nits
1 parent d4e65bf commit ee3c212

File tree

2 files changed

+46
-46
lines changed

2 files changed

+46
-46
lines changed

dbos/conductor.go

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ func (c *Conductor) handleExecutorInfoRequest(data []byte, requestID string) err
387387
Hostname: &hostname,
388388
}
389389

390-
return c.sendResponse(response, "executor info response")
390+
return c.sendResponse(response, string(executorInfo))
391391
}
392392

393393
// handleRecoveryRequest handles recovery requests from the conductor
@@ -424,7 +424,7 @@ func (c *Conductor) handleRecoveryRequest(data []byte, requestID string) error {
424424
Success: success,
425425
}
426426

427-
return c.sendResponse(response, "recovery response")
427+
return c.sendResponse(response, string(recoveryMessage))
428428
}
429429

430430
// handleCancelWorkflowRequest handles cancel workflow requests from the conductor
@@ -460,7 +460,7 @@ func (c *Conductor) handleCancelWorkflowRequest(data []byte, requestID string) e
460460
Success: success,
461461
}
462462

463-
return c.sendResponse(response, "cancel workflow response")
463+
return c.sendResponse(response, string(cancelWorkflowMessage))
464464
}
465465

466466
// handleResumeWorkflowRequest handles resume workflow requests from the conductor
@@ -497,7 +497,7 @@ func (c *Conductor) handleResumeWorkflowRequest(data []byte, requestID string) e
497497
Success: success,
498498
}
499499

500-
return c.sendResponse(response, "resume workflow response")
500+
return c.sendResponse(response, string(resumeWorkflowMessage))
501501
}
502502

503503
// handleRetentionRequest handles retention policy requests from the conductor
@@ -566,37 +566,7 @@ func (c *Conductor) handleRetentionRequest(data []byte, requestID string) error
566566
Success: success,
567567
}
568568

569-
return c.sendResponse(response, "retention response")
570-
}
571-
572-
// sendResponse sends a response to the conductor via websocket
573-
func (c *Conductor) sendResponse(response any, responseType string) error {
574-
if c.conn == nil {
575-
return fmt.Errorf("no connection")
576-
}
577-
578-
data, err := json.Marshal(response)
579-
if err != nil {
580-
return fmt.Errorf("failed to marshal %s: %w", responseType, err)
581-
}
582-
583-
c.logger.Debug("Sending response", "type", responseType, "len", len(data))
584-
585-
c.writeMu.Lock()
586-
defer c.writeMu.Unlock()
587-
588-
if err := c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
589-
c.logger.Warn("Failed to set write deadline", "type", responseType, "error", err)
590-
}
591-
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
592-
c.logger.Error("Failed to send response", "type", responseType, "error", err)
593-
return fmt.Errorf("failed to send message: %w", err)
594-
}
595-
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
596-
c.logger.Warn("Failed to clear write deadline", "type", responseType, "error", err)
597-
}
598-
599-
return nil
569+
return c.sendResponse(response, string(retentionMessage))
600570
}
601571

602572
// handleListWorkflowsRequest handles list workflows requests from the conductor
@@ -674,7 +644,7 @@ func (c *Conductor) handleListWorkflowsRequest(data []byte, requestID string) er
674644
Output: formattedWorkflows,
675645
}
676646

677-
return c.sendResponse(response, "list workflows response")
647+
return c.sendResponse(response, string(listWorkflowsMessage))
678648
}
679649

680650
// handleListQueuedWorkflowsRequest handles list queued workflows requests from the conductor
@@ -740,7 +710,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
740710
},
741711
Output: []listWorkflowsConductorResponseBody{},
742712
}
743-
return c.sendResponse(response, "list workflows response")
713+
return c.sendResponse(response, string(listQueuedWorkflowsMessage))
744714
}
745715

746716
// If no queue name was specified, only include workflows that have a queue name
@@ -769,7 +739,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
769739
Output: formattedWorkflows,
770740
}
771741

772-
return c.sendResponse(response, "list workflows response")
742+
return c.sendResponse(response, string(listQueuedWorkflowsMessage))
773743
}
774744

775745
// handleListStepsRequest handles list steps requests from the conductor
@@ -796,7 +766,7 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
796766
},
797767
Output: nil,
798768
}
799-
return c.sendResponse(response, "list steps response")
769+
return c.sendResponse(response, string(listStepsMessage))
800770
}
801771

802772
// Convert steps to response format
@@ -819,7 +789,7 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
819789
Output: formattedSteps,
820790
}
821791

822-
return c.sendResponse(response, "list steps response")
792+
return c.sendResponse(response, string(listStepsMessage))
823793
}
824794

825795
// handleGetWorkflowRequest handles get workflow requests from the conductor
@@ -866,7 +836,7 @@ func (c *Conductor) handleGetWorkflowRequest(data []byte, requestID string) erro
866836
Output: formattedWorkflow,
867837
}
868838

869-
return c.sendResponse(response, "get workflow response")
839+
return c.sendResponse(response, string(getWorkflowMessage))
870840
}
871841

872842
// handleForkWorkflowRequest handles fork workflow requests from the conductor
@@ -922,7 +892,7 @@ func (c *Conductor) handleForkWorkflowRequest(data []byte, requestID string) err
922892
NewWorkflowID: newWorkflowID,
923893
}
924894

925-
return c.sendResponse(response, "fork workflow response")
895+
return c.sendResponse(response, string(forkWorkflowMessage))
926896
}
927897

928898
// handleExistPendingWorkflowsRequest handles exist pending workflows requests from the conductor
@@ -962,7 +932,7 @@ func (c *Conductor) handleExistPendingWorkflowsRequest(data []byte, requestID st
962932
Exist: len(workflows) > 0,
963933
}
964934

965-
return c.sendResponse(response, "exist pending workflows response")
935+
return c.sendResponse(response, string(existPendingWorkflowsMessage))
966936
}
967937

968938
// handleUnknownMessageType sends an error response for unknown message types
@@ -1003,3 +973,33 @@ func (c *Conductor) handleUnknownMessageType(requestID string, msgType messageTy
1003973

1004974
return nil
1005975
}
976+
977+
// sendResponse sends a response to the conductor via websocket
978+
func (c *Conductor) sendResponse(response any, responseType string) error {
979+
if c.conn == nil {
980+
return fmt.Errorf("no connection")
981+
}
982+
983+
data, err := json.Marshal(response)
984+
if err != nil {
985+
return fmt.Errorf("failed to marshal %s: %w", responseType, err)
986+
}
987+
988+
c.logger.Debug("Sending response", "type", responseType, "len", len(data))
989+
990+
c.writeMu.Lock()
991+
defer c.writeMu.Unlock()
992+
993+
if err := c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
994+
c.logger.Warn("Failed to set write deadline", "type", responseType, "error", err)
995+
}
996+
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
997+
c.logger.Error("Failed to send response", "type", responseType, "error", err)
998+
return fmt.Errorf("failed to send message: %w", err)
999+
}
1000+
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
1001+
c.logger.Warn("Failed to clear write deadline", "type", responseType, "error", err)
1002+
}
1003+
1004+
return nil
1005+
}

dbos/system_database.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,14 +2174,14 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu
21742174
input.executorID,
21752175
time.Now().UnixMilli(),
21762176
id).Scan(&retWorkflow.name, &inputString)
2177+
if err != nil {
2178+
return nil, fmt.Errorf("failed to update workflow %s during dequeue: %w", id, err)
2179+
}
21772180

21782181
if inputString != nil && len(*inputString) > 0 {
21792182
retWorkflow.input = *inputString
21802183
}
21812184

2182-
if err != nil {
2183-
return nil, fmt.Errorf("failed to update workflow %s during dequeue: %w", id, err)
2184-
}
21852185
retWorkflows = append(retWorkflows, retWorkflow)
21862186
}
21872187

0 commit comments

Comments
 (0)