Skip to content
This repository was archived by the owner on Dec 6, 2025. It is now read-only.

Commit 0bee7d1

Browse files
committed
UpdateTask
1 parent 40b3137 commit 0bee7d1

File tree

5 files changed

+474
-17
lines changed

5 files changed

+474
-17
lines changed

services/tasks-svc/internal/task/aggregate/aggregate.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,28 @@ func LoadTaskAggregate(ctx context.Context, as hwes.AggregateStore, id uuid.UUID
3939
return taskAggregate, nil
4040
}
4141

42+
func LoadTaskAggregateWithSnapshotAt(ctx context.Context, as hwes.AggregateStore, id uuid.UUID, pauseAt *uint64) (*TaskAggregate, *models.Task, error) {
43+
taskAggregate := NewTaskAggregate(id)
44+
var snapshot *models.Task
45+
46+
if pauseAt != nil {
47+
// load pauseAt+1-many events (version is 0-indexed)
48+
if err := as.LoadN(ctx, taskAggregate, *pauseAt+1); err != nil {
49+
return nil, nil, err
50+
}
51+
52+
task := *taskAggregate.Task // deref copies model
53+
snapshot = &task
54+
}
55+
56+
// continue loading all other events
57+
if err := as.Load(ctx, taskAggregate); err != nil {
58+
return nil, nil, err
59+
}
60+
61+
return taskAggregate, snapshot, nil
62+
}
63+
4264
func (a *TaskAggregate) initEventListeners() {
4365
a.
4466
RegisterEventListener(taskEventsV1.TaskCreated, a.onTaskCreated).

services/tasks-svc/internal/task/api/grpc.go

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,22 @@ package api
33
import (
44
"common"
55
"context"
6+
"fmt"
7+
commonpb "gen/libs/common/v1"
68
pb "gen/services/tasks_svc/v1"
79
"github.com/google/uuid"
10+
zlog "github.com/rs/zerolog"
811
"google.golang.org/grpc/codes"
912
"google.golang.org/grpc/status"
13+
"google.golang.org/protobuf/proto"
1014
"google.golang.org/protobuf/types/known/timestamppb"
15+
"google.golang.org/protobuf/types/known/wrapperspb"
1116
"hwes"
1217
"hwutil"
1318
"strconv"
1419
"tasks-svc/internal/task/handlers"
20+
"tasks-svc/internal/util"
21+
"time"
1522
)
1623

1724
type TaskGrpcService struct {
@@ -61,15 +68,114 @@ func (s *TaskGrpcService) CreateTask(ctx context.Context, req *pb.CreateTaskRequ
6168
}, nil
6269
}
6370

71+
func timeAlreadyUpdated(was, is *time.Time) bool {
72+
if was != nil && is != nil {
73+
return !was.Round(time.Second).Equal(is.Round(time.Second))
74+
}
75+
76+
return true
77+
}
78+
6479
func (s *TaskGrpcService) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb.UpdateTaskResponse, error) {
80+
log := zlog.Ctx(ctx)
81+
6582
taskID, err := uuid.Parse(req.GetId())
6683
if err != nil {
6784
return nil, err
6885
}
6986

70-
consistency, err := s.handlers.Commands.V1.UpdateTask(ctx, taskID, req.Name, req.Description, req.Status, req.Public, req.DueAt)
71-
if err != nil {
72-
return nil, err
87+
expConsistency, ok := hwutil.ParseConsistency(req.Consistency)
88+
if !ok {
89+
return nil, common.UnparsableConsistencyError(ctx, "consistency")
90+
}
91+
92+
var consistency uint64
93+
94+
for i := 0; true; i++ {
95+
if i > 10 {
96+
log.Warn().Msg("UpdatePatient: conflict circuit breaker triggered")
97+
return nil, fmt.Errorf("failed conflict resolution")
98+
}
99+
100+
c, conflict, err := s.handlers.Commands.V1.UpdateTask(ctx,
101+
taskID, req.Name, req.Description, req.Status, req.Public, req.DueAt, expConsistency)
102+
if err != nil {
103+
return nil, err
104+
}
105+
consistency = c
106+
107+
if conflict == nil {
108+
break
109+
}
110+
conflicts := make(map[string]*commonpb.AttributeConflict)
111+
112+
// TODO: find a generic approach
113+
nameUpdateRequested := req.Name != nil && *req.Name != conflict.Is.Name
114+
nameAlreadyUpdated := conflict.Was.Name != conflict.Is.Name
115+
if nameUpdateRequested && nameAlreadyUpdated {
116+
conflicts["name"], err = util.AttributeConflict(
117+
wrapperspb.String(conflict.Is.Name),
118+
wrapperspb.String(*req.Name),
119+
)
120+
if err != nil {
121+
return nil, err
122+
}
123+
}
124+
125+
descrUpdateRequested := req.Description != nil && *req.Description != conflict.Is.Description
126+
descrAlreadyUpdated := conflict.Was.Description != conflict.Is.Description
127+
if descrUpdateRequested && descrAlreadyUpdated {
128+
conflicts["description"], err = util.AttributeConflict(
129+
wrapperspb.String(conflict.Is.Description),
130+
wrapperspb.String(*req.Description),
131+
)
132+
if err != nil {
133+
return nil, fmt.Errorf("could not marshall description conflict: %w", err)
134+
}
135+
}
136+
137+
dueUpdateRequested := req.DueAt != nil &&
138+
(conflict.Is.DueAt == nil || !req.DueAt.AsTime().Round(time.Second).Equal(conflict.Is.DueAt.Round(time.Second)))
139+
dueAlreadyUpdated := timeAlreadyUpdated(conflict.Was.DueAt, conflict.Is.DueAt)
140+
if dueUpdateRequested && dueAlreadyUpdated {
141+
var is proto.Message = nil
142+
if conflict.Is.DueAt != nil {
143+
is = timestamppb.New(*conflict.Is.DueAt)
144+
}
145+
conflicts["due_at"], err = util.AttributeConflict(
146+
is,
147+
req.DueAt,
148+
)
149+
if err != nil {
150+
return nil, fmt.Errorf("could not marshall due_at conflict: %w", err)
151+
}
152+
}
153+
154+
statusUpdateRequested := req.Status != nil && *req.Status != conflict.Is.Status
155+
statusAlreadyUpdated := conflict.Was.Status != conflict.Is.Status
156+
if statusUpdateRequested && statusAlreadyUpdated {
157+
conflicts["status"], err = util.AttributeConflict(
158+
wrapperspb.Int32(int32(conflict.Is.Status)),
159+
wrapperspb.Int32(int32(*req.Status)),
160+
)
161+
if err != nil {
162+
return nil, fmt.Errorf("could not marshall status conflict: %w", err)
163+
}
164+
}
165+
166+
// bool public can never cause a problem
167+
// the user expects public = B, and sets it to \neg B
168+
// so either that is the case still, or the update will do nothing anyway
169+
170+
if len(conflicts) != 0 {
171+
return &pb.UpdateTaskResponse{
172+
Conflict: &commonpb.Conflict{ConflictingAttributes: conflicts},
173+
Consistency: strconv.FormatUint(conflict.Consistency, 10),
174+
}, nil
175+
}
176+
177+
// no conflict? retry with new consistency
178+
expConsistency = &conflict.Consistency
73179
}
74180

75181
return &pb.UpdateTaskResponse{

services/tasks-svc/internal/task/commands/v1/update_task.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@ import (
77
"google.golang.org/protobuf/types/known/timestamppb"
88
"hwes"
99
"tasks-svc/internal/task/aggregate"
10+
"tasks-svc/internal/task/models"
1011
)
1112

13+
type UpdateTaskConflict struct {
14+
Consistency uint64
15+
Was *models.Task
16+
Is *models.Task
17+
}
18+
1219
type UpdateTaskCommandHandler func(
1320
ctx context.Context,
1421
taskID uuid.UUID,
@@ -17,45 +24,63 @@ type UpdateTaskCommandHandler func(
1724
status *pb.TaskStatus,
1825
public *bool,
1926
dueAt *timestamppb.Timestamp,
20-
) (uint64, error)
27+
expConsistency *uint64,
28+
) (uint64, *UpdateTaskConflict, error)
2129

2230
func NewUpdateTaskCommandHandler(as hwes.AggregateStore) UpdateTaskCommandHandler {
23-
return func(ctx context.Context, taskID uuid.UUID, name *string, description *string, status *pb.TaskStatus, public *bool, dueAt *timestamppb.Timestamp) (uint64, error) {
24-
a, err := aggregate.LoadTaskAggregate(ctx, as, taskID)
31+
return func(ctx context.Context,
32+
taskID uuid.UUID,
33+
name *string,
34+
description *string,
35+
status *pb.TaskStatus,
36+
public *bool,
37+
dueAt *timestamppb.Timestamp,
38+
expConsistency *uint64,
39+
) (uint64, *UpdateTaskConflict, error) {
40+
a, oldState, err := aggregate.LoadTaskAggregateWithSnapshotAt(ctx, as, taskID, expConsistency)
2541
if err != nil {
26-
return 0, err
42+
return 0, nil, err
43+
}
44+
45+
if expConsistency != nil && *expConsistency != a.GetVersion() {
46+
return 0, &UpdateTaskConflict{
47+
Consistency: a.GetVersion(),
48+
Was: oldState,
49+
Is: a.Task,
50+
}, nil
2751
}
2852

2953
if name != nil {
3054
if err := a.UpdateName(ctx, *name); err != nil {
31-
return 0, err
55+
return 0, nil, err
3256
}
3357
}
3458

3559
if description != nil {
3660
if err := a.UpdateDescription(ctx, *description); err != nil {
37-
return 0, err
61+
return 0, nil, err
3862
}
3963
}
4064

4165
if status != nil {
4266
if err := a.UpdateStatus(ctx, *status); err != nil {
43-
return 0, err
67+
return 0, nil, err
4468
}
4569
}
4670

4771
if public != nil && a.Task.Public != *public {
4872
if err := a.UpdateTaskPublic(ctx, *public); err != nil {
49-
return 0, err
73+
return 0, nil, err
5074
}
5175
}
5276

5377
if dueAt != nil {
5478
if err := a.UpdateDueAt(ctx, dueAt.AsTime()); err != nil {
55-
return 0, err
79+
return 0, nil, err
5680
}
5781
}
5882

59-
return as.Save(ctx, a)
83+
consistency, err := as.Save(ctx, a)
84+
return consistency, nil, err
6085
}
6186
}

services/tasks-svc/stories/PatientCRUD_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"hwtesting"
1010
"hwutil"
1111
"strconv"
12+
"strings"
1213
"testing"
1314
"time"
1415
)
@@ -692,6 +693,7 @@ func TestUpdatePatientConflict(t *testing.T) {
692693
B := "B"
693694
C := "C"
694695

696+
// Only testing HumanReadableIdentifier
695697
testMatrix := []struct {
696698
was string
697699
is string
@@ -741,9 +743,9 @@ func TestUpdatePatientConflict(t *testing.T) {
741743
if o.expectConflict {
742744
conflict := updateRes.Conflict.ConflictingAttributes["human_readable_identifier"]
743745
assert.NotNil(t, conflict)
744-
exp := "is:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"B\"}} " +
746+
exp := "is:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"B\"}} " +
745747
"want:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"C\"}}"
746-
assert.Equal(t, exp, conflict.String())
748+
assert.Equal(t, exp, strings.Replace(conflict.String(), " ", " ", 1))
747749
}
748750
})
749751
}
@@ -830,10 +832,10 @@ func TestAssignBedConflict(t *testing.T) {
830832

831833
exp := "want:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"" + o.want + "\"}}"
832834
if o.is != nil {
833-
exp = "is:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"" + *o.is + "\"}} " +
835+
exp = "is:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"" + *o.is + "\"}} " +
834836
"want:{[type.googleapis.com/google.protobuf.StringValue]:{value:\"" + o.want + "\"}}"
835837
}
836-
assert.Equal(t, exp, conflict.String())
838+
assert.Equal(t, exp, strings.Replace(conflict.String(), " ", " ", 1))
837839
}
838840
})
839841
}

0 commit comments

Comments
 (0)