Skip to content
This repository was archived by the owner on Dec 6, 2025. It is now read-only.

Commit 7d5ac76

Browse files
committed
updatesubtask
1 parent 7d79d6a commit 7d5ac76

File tree

12 files changed

+359
-295
lines changed

12 files changed

+359
-295
lines changed

gen/dart/lib/services/tasks_svc/v1/task_svc.pb.dart

Lines changed: 0 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/dart/lib/services/tasks_svc/v1/task_svc.pbjson.dart

Lines changed: 1 addition & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/go/services/tasks_svc/v1/task_svc.pb.go

Lines changed: 173 additions & 185 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/ts/services/tasks_svc/v1/task_svc_pb.d.ts

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/ts/services/tasks_svc/v1/task_svc_pb.js

Lines changed: 1 addition & 49 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/services/tasks_svc/v1/task_svc.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ message AssignTaskResponse {
219219
message UnassignTaskRequest {
220220
string task_id = 1; // @gotags: validate:"uuid4"
221221
string user_id = 2; // @gotags: validate:"uuid4"
222-
optional string consistency = 3; // no conflict detection, if not provided
223222
}
224223

225224
message UnassignTaskResponse {

services/tasks-svc/internal/task/aggregate/aggregate.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
pb "gen/services/tasks_svc/v1"
77
"github.com/google/uuid"
88
"hwes"
9+
"hwutil"
910
taskEventsV1 "tasks-svc/internal/task/events/v1"
1011
"tasks-svc/internal/task/models"
1112
"time"
@@ -50,6 +51,17 @@ func LoadTaskAggregateWithSnapshotAt(ctx context.Context, as hwes.AggregateStore
5051
}
5152

5253
task := *taskAggregate.Task // deref copies model
54+
55+
// also copy pointer values
56+
if task.DueAt != nil {
57+
task.DueAt = hwutil.PtrTo(*task.DueAt)
58+
}
59+
subtasks := make(map[uuid.UUID]models.Subtask)
60+
for key, value := range task.Subtasks {
61+
subtasks[key] = value
62+
}
63+
task.Subtasks = subtasks
64+
5365
snapshot = &task
5466
}
5567

services/tasks-svc/internal/task/api/grpc.go

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
pb "gen/services/tasks_svc/v1"
99
"github.com/google/uuid"
1010
zlog "github.com/rs/zerolog"
11-
"github.com/rs/zerolog/log"
1211
"google.golang.org/grpc/codes"
1312
"google.golang.org/grpc/status"
1413
"google.golang.org/protobuf/proto"
@@ -18,6 +17,7 @@ import (
1817
"hwutil"
1918
"strconv"
2019
"tasks-svc/internal/task/handlers"
20+
"tasks-svc/internal/task/models"
2121
"tasks-svc/internal/util"
2222
"time"
2323
)
@@ -204,7 +204,7 @@ func (s *TaskGrpcService) AssignTask(ctx context.Context, req *pb.AssignTaskRequ
204204

205205
for i := 0; true; i++ {
206206
if i > 10 {
207-
log.Warn().Msg("UpdatePatient: conflict circuit breaker triggered")
207+
zlog.Ctx(ctx).Warn().Msg("UpdatePatient: conflict circuit breaker triggered")
208208
return nil, fmt.Errorf("failed conflict resolution")
209209
}
210210

@@ -526,9 +526,66 @@ func (s *TaskGrpcService) UpdateSubtask(ctx context.Context, req *pb.UpdateSubta
526526
return nil, err
527527
}
528528

529-
consistency, err := s.handlers.Commands.V1.UpdateSubtask(ctx, taskID, subtaskID, req.Subtask.Name, req.Subtask.Done)
530-
if err != nil {
531-
return nil, err
529+
expConsistency, ok := hwutil.ParseConsistency(req.TaskConsistency)
530+
if !ok {
531+
return nil, common.UnparsableConsistencyError(ctx, "task_consistency")
532+
}
533+
534+
var consistency uint64
535+
536+
for i := 0; true; i++ {
537+
if i > 10 {
538+
zlog.Ctx(ctx).Warn().Msg("UpdateSubtask: conflict circuit breaker triggered")
539+
return nil, fmt.Errorf("failed conflict resolution")
540+
}
541+
542+
c, conflict, err := s.handlers.Commands.V1.UpdateSubtask(ctx, taskID, subtaskID, req.Subtask.Name, req.Subtask.Done, expConsistency)
543+
if err != nil {
544+
return nil, err
545+
}
546+
consistency = c
547+
548+
if conflict == nil {
549+
break
550+
}
551+
552+
conflicts := make(map[string]*commonpb.AttributeConflict)
553+
554+
var was models.Subtask
555+
if w, ok := conflict.Was.Subtasks[subtaskID]; ok {
556+
was = w
557+
} else {
558+
return nil, fmt.Errorf("subtask did not exist at expConsistency")
559+
}
560+
561+
is := conflict.Is.Subtasks[subtaskID] // handler would have failed if non-existent
562+
563+
// TODO: find a generic approach
564+
nameUpdateRequested := req.Subtask.Name != nil && *req.Subtask.Name != is.Name
565+
nameAlreadyUpdated := was.Name != is.Name
566+
if nameUpdateRequested && nameAlreadyUpdated {
567+
conflicts["name"], err = util.AttributeConflict(
568+
wrapperspb.String(is.Name),
569+
wrapperspb.String(*req.Subtask.Name),
570+
)
571+
if err != nil {
572+
return nil, err
573+
}
574+
}
575+
576+
if len(conflicts) != 0 {
577+
return &pb.UpdateSubtaskResponse{
578+
Conflict: &commonpb.Conflict{ConflictingAttributes: conflicts},
579+
TaskConsistency: strconv.FormatUint(conflict.Consistency, 10),
580+
}, nil
581+
}
582+
583+
// bool done can never cause a problem
584+
// the user expects done = B, and sets it to \neg B
585+
// so either that is the case still, or the update will do nothing anyway
586+
587+
// no conflict? retry with new consistency
588+
expConsistency = &conflict.Consistency
532589
}
533590

534591
return &pb.UpdateSubtaskResponse{

services/tasks-svc/internal/task/commands/v1/assign_task.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,19 @@ import (
55
"github.com/google/uuid"
66
"hwes"
77
"tasks-svc/internal/task/aggregate"
8-
"tasks-svc/internal/task/models"
98
)
109

11-
type AssignTaskConflict struct {
12-
Consistency uint64
13-
Was *models.Task
14-
Is *models.Task
15-
}
16-
17-
type AssignTaskCommandHandler func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (uint64, *AssignTaskConflict, error)
10+
type AssignTaskCommandHandler func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (uint64, *UpdateTaskConflict, error)
1811

1912
func NewAssignTaskCommandHandler(as hwes.AggregateStore) AssignTaskCommandHandler {
20-
return func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (uint64, *AssignTaskConflict, error) {
13+
return func(ctx context.Context, taskID, userID uuid.UUID, expConsistency *uint64) (uint64, *UpdateTaskConflict, error) {
2114
task, oldState, err := aggregate.LoadTaskAggregateWithSnapshotAt(ctx, as, taskID, expConsistency)
2215
if err != nil {
2316
return 0, nil, err
2417
}
2518

2619
if expConsistency != nil && *expConsistency != task.GetVersion() {
27-
return 0, &AssignTaskConflict{
20+
return 0, &UpdateTaskConflict{
2821
Consistency: task.GetVersion(),
2922
Was: oldState,
3023
Is: task.Task,

services/tasks-svc/internal/task/commands/v1/update_subtask.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,41 @@ import (
88
"tasks-svc/internal/task/aggregate"
99
)
1010

11-
type UpdateSubtaskCommandHandler func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool) (uint64, error)
11+
type UpdateSubtaskCommandHandler func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool, expConsistency *uint64) (uint64, *UpdateTaskConflict, error)
1212

1313
func NewUpdateSubtaskCommandHandler(as hwes.AggregateStore) UpdateSubtaskCommandHandler {
14-
return func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool) (uint64, error) {
15-
a, err := aggregate.LoadTaskAggregate(ctx, as, taskID)
14+
return func(ctx context.Context, taskID, subtaskID uuid.UUID, name *string, done *bool, expConsistency *uint64) (uint64, *UpdateTaskConflict, error) {
15+
a, oldState, err := aggregate.LoadTaskAggregateWithSnapshotAt(ctx, as, taskID, expConsistency)
1616
if err != nil {
17-
return 0, err
17+
return 0, nil, err
1818
}
1919

2020
currentSubtask, found := a.Task.Subtasks[subtaskID]
2121
if !found {
22-
return 0, fmt.Errorf("subtask with ID: %s not found on Task with ID: %s", subtaskID, taskID)
22+
return 0, nil, fmt.Errorf("subtask with ID: %s not found on Task with ID: %s", subtaskID, taskID)
23+
}
24+
25+
if expConsistency != nil && *expConsistency != a.GetVersion() {
26+
return 0, &UpdateTaskConflict{
27+
Consistency: a.GetVersion(),
28+
Was: oldState,
29+
Is: a.Task,
30+
}, err
2331
}
2432

2533
if name != nil && *name != currentSubtask.Name {
2634
if err := a.UpdateSubtaskName(ctx, subtaskID, *name); err != nil {
27-
return 0, err
35+
return 0, nil, err
2836
}
2937
}
3038

3139
if done != nil && *done != currentSubtask.Done {
3240
if err := a.UpdateSubtaskDone(ctx, subtaskID, *done); err != nil {
33-
return 0, err
41+
return 0, nil, err
3442
}
3543
}
3644

37-
return as.Save(ctx, a)
45+
consistency, err := as.Save(ctx, a)
46+
return consistency, nil, err
3847
}
3948
}

0 commit comments

Comments
 (0)