Skip to content

Commit f073190

Browse files
committed
Small cleanup
1 parent 450bf5b commit f073190

File tree

3 files changed

+4
-47
lines changed

3 files changed

+4
-47
lines changed

backend/redis/instance.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -164,38 +164,6 @@ type instanceState struct {
164164
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
165165
}
166166

167-
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata) error {
168-
key := instanceKey(instance)
169-
170-
createdAt := time.Now()
171-
172-
b, err := json.Marshal(&instanceState{
173-
Instance: instance,
174-
State: core.WorkflowInstanceStateActive,
175-
Metadata: metadata,
176-
CreatedAt: createdAt,
177-
})
178-
if err != nil {
179-
return fmt.Errorf("marshaling instance state: %w", err)
180-
}
181-
182-
p.SetNX(ctx, key, string(b), 0)
183-
184-
// The newly created instance is going to be the active execution
185-
if err := setActiveInstanceExecutionP(ctx, p, instance); err != nil {
186-
return fmt.Errorf("setting active instance execution: %w", err)
187-
}
188-
189-
p.ZAdd(ctx, instancesByCreation(), redis.Z{
190-
Member: instanceSegment(instance),
191-
Score: float64(createdAt.UnixMilli()),
192-
})
193-
194-
p.SAdd(ctx, instancesActive(), instanceSegment(instance))
195-
196-
return nil
197-
}
198-
199167
func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceKey string) (*instanceState, error) {
200168
p := rdb.Pipeline()
201169

@@ -246,14 +214,3 @@ func readActiveInstanceExecution(ctx context.Context, rdb redis.UniversalClient,
246214

247215
return instance, nil
248216
}
249-
250-
func setActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance) error {
251-
key := activeInstanceExecutionKey(instance.InstanceID)
252-
253-
b, err := json.Marshal(instance)
254-
if err != nil {
255-
return fmt.Errorf("marshaling instance: %w", err)
256-
}
257-
258-
return p.Set(ctx, key, string(b), 0).Err()
259-
}

internal/worker/worker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ func (w *Worker[Task, TaskResult]) dispatcher() {
135135

136136
// Create new context to allow tasks to complete when root context is canceled
137137
taskCtx := context.Background()
138-
w.handle(taskCtx, t)
138+
if err := w.handle(taskCtx, t); err != nil {
139+
w.logger.ErrorContext(taskCtx, "error handling task", "error", err)
140+
}
139141

140142
if sem != nil {
141143
<-sem

internal/workflow/executor.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,13 +659,11 @@ func (e *executor) handleSideEffectResult(event *history.Event, a *history.SideE
659659
return e.workflow.Continue()
660660
}
661661

662-
func (e *executor) workflowCompleted(result payload.Payload, wfErr error) error {
662+
func (e *executor) workflowCompleted(result payload.Payload, wfErr error) {
663663
eventId := e.workflowState.GetNextScheduleEventID()
664664

665665
cmd := command.NewCompleteWorkflowCommand(eventId, e.workflowState.Instance(), result, workflowerrors.FromError(wfErr))
666666
e.workflowState.AddCommand(cmd)
667-
668-
return nil
669667
}
670668

671669
func (e *executor) workflowRestarted(result payload.Payload, continueAsNew *continueasnew.Error) {

0 commit comments

Comments
 (0)