Skip to content

Commit cd7032e

Browse files
committed
implement default inmemory tasks management, reenable tests
Signed-off-by: Fabian Martinez <[email protected]>
1 parent 02f270f commit cd7032e

File tree

5 files changed

+17
-98
lines changed

5 files changed

+17
-98
lines changed

backend/postgres/postgres.go

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/dapr/durabletask-go/api/helpers"
1515
"github.com/dapr/durabletask-go/api/protos"
1616
"github.com/dapr/durabletask-go/backend"
17+
"github.com/dapr/durabletask-go/backend/local"
1718
"github.com/dapr/durabletask-go/backend/runtimestate"
1819
"github.com/google/uuid"
1920
"google.golang.org/grpc/codes"
@@ -44,6 +45,7 @@ type postgresBackend struct {
4445
workerName string
4546
logger backend.Logger
4647
options *PostgresOptions
48+
*local.TasksBackend
4749
}
4850

4951
// NewPostgresOptions creates a new options object for the postgres backend provider.
@@ -79,10 +81,11 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba
7981
}
8082

8183
return &postgresBackend{
82-
db: nil,
83-
workerName: fmt.Sprintf("%s,%d,%s", hostname, pid, uuidStr),
84-
options: opts,
85-
logger: logger,
84+
db: nil,
85+
workerName: fmt.Sprintf("%s,%d,%s", hostname, pid, uuidStr),
86+
options: opts,
87+
logger: logger,
88+
TasksBackend: local.NewTasksBackend(),
8689
}
8790
}
8891

@@ -1129,35 +1132,3 @@ func (be *postgresBackend) String() string {
11291132
func (be *postgresBackend) RerunWorkflowFromEvent(ctx context.Context, req *backend.RerunWorkflowFromEventRequest) (api.InstanceID, error) {
11301133
return "", status.Error(codes.Unimplemented, "not implemented")
11311134
}
1132-
1133-
// CompleteOrchestratorTask completes the orchestrator task by saving the updated runtime state to durable storage.
1134-
func (be *postgresBackend) CompleteOrchestratorTask(context.Context, *protos.OrchestratorResponse) error {
1135-
return nil
1136-
}
1137-
1138-
// CancelOrchestratorTask cancels the orchestrator task so instances of WaitForOrchestratorCompletion will return an error.
1139-
func (be *postgresBackend) CancelOrchestratorTask(context.Context, api.InstanceID) error { return nil }
1140-
1141-
// WaitForOrchestratorCompletion blocks until the orchestrator completes and returns the final response.
1142-
//
1143-
// [api.ErrTaskCancelled] is returned if the task was cancelled.
1144-
func (be *postgresBackend) WaitForOrchestratorCompletion(context.Context, *protos.OrchestratorRequest) (*protos.OrchestratorResponse, error) {
1145-
return nil, nil
1146-
}
1147-
1148-
// CompleteActivityTask completes the activity task by saving the updated runtime state to durable storage.
1149-
func (be *postgresBackend) CompleteActivityTask(context.Context, *protos.ActivityResponse) error {
1150-
return nil
1151-
}
1152-
1153-
// CancelActivityTask cancels the activity task so instances of WaitForActivityCompletion will return an error.
1154-
func (be *postgresBackend) CancelActivityTask(context.Context, api.InstanceID, int32) error {
1155-
return nil
1156-
}
1157-
1158-
// WaitForActivityCompletion blocks until the activity completes and returns the final response.
1159-
//
1160-
// [api.ErrTaskCancelled] is returned if the task was cancelled.
1161-
func (be *postgresBackend) WaitForActivityCompletion(context.Context, *protos.ActivityRequest) (*protos.ActivityResponse, error) {
1162-
return nil, nil
1163-
}

backend/sqlite/sqlite.go

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/dapr/durabletask-go/api/helpers"
1616
"github.com/dapr/durabletask-go/api/protos"
1717
"github.com/dapr/durabletask-go/backend"
18+
"github.com/dapr/durabletask-go/backend/local"
1819
"github.com/dapr/durabletask-go/backend/runtimestate"
1920
"github.com/google/uuid"
2021
"google.golang.org/grpc/codes"
@@ -45,9 +46,7 @@ type sqliteBackend struct {
4546
workerName string
4647
logger backend.Logger
4748
options *SqliteOptions
48-
49-
activityWorker *backend.TaskWorker[*backend.ActivityWorkItem]
50-
orchestrationWorker *backend.TaskWorker[*backend.OrchestrationWorkItem]
49+
*local.TasksBackend
5150
}
5251

5352
// NewSqliteOptions creates a new options object for the sqlite backend provider.
@@ -73,10 +72,11 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
7372
uuidStr := uuid.NewString()
7473

7574
be := &sqliteBackend{
76-
db: nil,
77-
workerName: fmt.Sprintf("%s,%d,%s", hostname, pid, uuidStr),
78-
options: opts,
79-
logger: logger,
75+
db: nil,
76+
workerName: fmt.Sprintf("%s,%d,%s", hostname, pid, uuidStr),
77+
options: opts,
78+
logger: logger,
79+
TasksBackend: local.NewTasksBackend(),
8080
}
8181

8282
if opts == nil {
@@ -1114,35 +1114,3 @@ func (be *sqliteBackend) String() string {
11141114
func (be *sqliteBackend) RerunWorkflowFromEvent(ctx context.Context, req *backend.RerunWorkflowFromEventRequest) (api.InstanceID, error) {
11151115
return "", status.Error(codes.Unimplemented, "not implemented")
11161116
}
1117-
1118-
// CompleteOrchestratorTask completes the orchestrator task by saving the updated runtime state to durable storage.
1119-
func (be *sqliteBackend) CompleteOrchestratorTask(context.Context, *protos.OrchestratorResponse) error {
1120-
return nil
1121-
}
1122-
1123-
// CancelOrchestratorTask cancels the orchestrator task so instances of WaitForOrchestratorCompletion will return an error.
1124-
func (be *sqliteBackend) CancelOrchestratorTask(context.Context, api.InstanceID) error { return nil }
1125-
1126-
// WaitForOrchestratorCompletion blocks until the orchestrator completes and returns the final response.
1127-
//
1128-
// [api.ErrTaskCancelled] is returned if the task was cancelled.
1129-
func (be *sqliteBackend) WaitForOrchestratorCompletion(context.Context, *protos.OrchestratorRequest) (*protos.OrchestratorResponse, error) {
1130-
return new(protos.OrchestratorResponse), nil
1131-
}
1132-
1133-
// CompleteActivityTask completes the activity task by saving the updated runtime state to durable storage.
1134-
func (be *sqliteBackend) CompleteActivityTask(context.Context, *protos.ActivityResponse) error {
1135-
return nil
1136-
}
1137-
1138-
// CancelActivityTask cancels the activity task so instances of WaitForActivityCompletion will return an error.
1139-
func (be *sqliteBackend) CancelActivityTask(context.Context, api.InstanceID, int32) error {
1140-
return nil
1141-
}
1142-
1143-
// WaitForActivityCompletion blocks until the activity completes and returns the final response.
1144-
//
1145-
// [api.ErrTaskCancelled] is returned if the task was cancelled.
1146-
func (be *sqliteBackend) WaitForActivityCompletion(context.Context, *protos.ActivityRequest) (*protos.ActivityResponse, error) {
1147-
return new(protos.ActivityResponse), nil
1148-
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.1
44

55
require (
66
github.com/cenkalti/backoff/v4 v4.3.0
7-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f
7+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69
88
github.com/google/uuid v1.6.0
99
github.com/jackc/pgx/v5 v5.7.4
1010
github.com/stretchr/testify v1.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
22
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
3-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f h1:gugkO8r833phJ31v/HhCUOHxznAkuxdsC6KMh391NOE=
4-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw=
3+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 h1:I1Uoy3fn906AZZdG8+n8fHitgY7Wn9c+smz4WQdOy1Q=
4+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
55
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
66
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
77
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

tests/grpc/grpc_test.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ func startGrpcListener(t *testing.T, r *task.TaskRegistry) context.CancelFunc {
9191
}
9292

9393
func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) {
94-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
95-
9694
r := task.NewTaskRegistry()
9795
r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) {
9896
// sleep 5 seconds
@@ -114,8 +112,6 @@ func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) {
114112
}
115113

116114
func Test_Grpc_WaitForInstanceStart_ConnectionResume(t *testing.T) {
117-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
118-
119115
r := task.NewTaskRegistry()
120116
r.AddOrchestratorN("WaitForInstanceStartThrowsException", func(ctx *task.OrchestrationContext) (any, error) {
121117
// sleep 5 seconds
@@ -150,8 +146,6 @@ func Test_Grpc_WaitForInstanceStart_ConnectionResume(t *testing.T) {
150146
}
151147

152148
func Test_Grpc_HelloOrchestration(t *testing.T) {
153-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
154-
155149
r := task.NewTaskRegistry()
156150
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
157151
var input string
@@ -191,8 +185,6 @@ func Test_Grpc_HelloOrchestration(t *testing.T) {
191185
}
192186

193187
func Test_Grpc_SuspendResume(t *testing.T) {
194-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
195-
196188
const eventCount = 10
197189

198190
r := task.NewTaskRegistry()
@@ -245,8 +237,6 @@ func Test_Grpc_SuspendResume(t *testing.T) {
245237
}
246238

247239
func Test_Grpc_Terminate_Recursive(t *testing.T) {
248-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
249-
250240
delayTime := 4 * time.Second
251241
executedActivity := false
252242
r := task.NewTaskRegistry()
@@ -308,8 +298,6 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) {
308298
}
309299

310300
func Test_Grpc_ReuseInstanceIDIgnore(t *testing.T) {
311-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
312-
313301
delayTime := 2 * time.Second
314302
r := task.NewTaskRegistry()
315303
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
@@ -358,8 +346,6 @@ func Test_Grpc_ReuseInstanceIDIgnore(t *testing.T) {
358346
}
359347

360348
func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) {
361-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
362-
363349
delayTime := 2 * time.Second
364350
r := task.NewTaskRegistry()
365351
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
@@ -408,8 +394,6 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) {
408394
}
409395

410396
func Test_Grpc_ReuseInstanceIDError(t *testing.T) {
411-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
412-
413397
delayTime := 4 * time.Second
414398
r := task.NewTaskRegistry()
415399
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
@@ -443,8 +427,6 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) {
443427
}
444428

445429
func Test_Grpc_ActivityRetries(t *testing.T) {
446-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
447-
448430
r := task.NewTaskRegistry()
449431
r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) {
450432
if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{
@@ -476,8 +458,6 @@ func Test_Grpc_ActivityRetries(t *testing.T) {
476458
}
477459

478460
func Test_Grpc_SubOrchestratorRetries(t *testing.T) {
479-
t.Skip("TODO: @joshvanl: re-enable after sqlite implementation of new backend funcs")
480-
481461
r := task.NewTaskRegistry()
482462
r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) {
483463
err := ctx.CallSubOrchestrator(

0 commit comments

Comments
 (0)