44 "context"
55 "database/sql"
66 "embed"
7- _ "embed"
87 "encoding/json"
98 "errors"
109 "fmt"
@@ -74,17 +73,17 @@ type postgresBackend struct {
7473 options * options
7574}
7675
77- func (mb * postgresBackend ) FeatureSupported (feature backend.Feature ) bool {
76+ func (pb * postgresBackend ) FeatureSupported (feature backend.Feature ) bool {
7877 return true
7978}
8079
81- func (mb * postgresBackend ) Close () error {
82- return mb .db .Close ()
80+ func (pb * postgresBackend ) Close () error {
81+ return pb .db .Close ()
8382}
8483
8584// Migrate applies any pending database migrations.
86- func (mb * postgresBackend ) Migrate () error {
87- schemaDsn := mb .dsn
85+ func (pb * postgresBackend ) Migrate () error {
86+ schemaDsn := pb .dsn
8887 db , err := sql .Open ("pgx" , schemaDsn )
8988 if err != nil {
9089 return fmt .Errorf ("opening schema database: %w" , err )
@@ -118,20 +117,20 @@ func (mb *postgresBackend) Migrate() error {
118117 return nil
119118}
120119
121- func (mb * postgresBackend ) Tracer () trace.Tracer {
122- return mb .options .TracerProvider .Tracer (backend .TracerName )
120+ func (pb * postgresBackend ) Tracer () trace.Tracer {
121+ return pb .options .TracerProvider .Tracer (backend .TracerName )
123122}
124123
125- func (mb * postgresBackend ) Metrics () metrics.Client {
126- return mb .options .Metrics .WithTags (metrics.Tags {metrickeys .Backend : "postgres" })
124+ func (pb * postgresBackend ) Metrics () metrics.Client {
125+ return pb .options .Metrics .WithTags (metrics.Tags {metrickeys .Backend : "postgres" })
127126}
128127
129- func (mb * postgresBackend ) Options () * backend.Options {
130- return mb .options .Options
128+ func (pb * postgresBackend ) Options () * backend.Options {
129+ return pb .options .Options
131130}
132131
133- func (mb * postgresBackend ) CreateWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
134- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
132+ func (pb * postgresBackend ) CreateWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
133+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
135134 Isolation : sql .LevelReadCommitted ,
136135 })
137136 if err != nil {
@@ -158,21 +157,21 @@ func (mb *postgresBackend) CreateWorkflowInstance(ctx context.Context, instance
158157 return nil
159158}
160159
161- func (mb * postgresBackend ) RemoveWorkflowInstance (ctx context.Context , instance * core.WorkflowInstance ) error {
162- tx , err := mb .db .BeginTx (ctx , nil )
160+ func (pb * postgresBackend ) RemoveWorkflowInstance (ctx context.Context , instance * core.WorkflowInstance ) error {
161+ tx , err := pb .db .BeginTx (ctx , nil )
163162 if err != nil {
164163 return err
165164 }
166165 defer tx .Rollback ()
167166
168- if err := mb .removeWorkflowInstance (ctx , instance , tx ); err != nil {
167+ if err := pb .removeWorkflowInstance (ctx , instance , tx ); err != nil {
169168 return err
170169 }
171170
172171 return tx .Commit ()
173172}
174173
175- func (mb * postgresBackend ) removeWorkflowInstance (ctx context.Context , instance * core.WorkflowInstance , tx * sql.Tx ) error {
174+ func (pb * postgresBackend ) removeWorkflowInstance (ctx context.Context , instance * core.WorkflowInstance , tx * sql.Tx ) error {
176175 row := tx .QueryRowContext (ctx , "SELECT state FROM instances WHERE instance_id = $1 AND execution_id = $2 LIMIT 1" , instance .InstanceID , instance .ExecutionID )
177176 var state core.WorkflowInstanceState
178177 if err := row .Scan (& state ); err != nil {
@@ -202,13 +201,13 @@ func (mb *postgresBackend) removeWorkflowInstance(ctx context.Context, instance
202201 return nil
203202}
204203
205- func (mb * postgresBackend ) RemoveWorkflowInstances (ctx context.Context , options ... backend.RemovalOption ) error {
204+ func (pb * postgresBackend ) RemoveWorkflowInstances (ctx context.Context , options ... backend.RemovalOption ) error {
206205 ro := backend .DefaultRemovalOptions
207206 for _ , opt := range options {
208207 opt (& ro )
209208 }
210209
211- rows , err := mb .db .QueryContext (ctx , "SELECT instance_id, execution_id FROM instances WHERE completed_at < $1" , ro .FinishedBefore )
210+ rows , err := pb .db .QueryContext (ctx , "SELECT instance_id, execution_id FROM instances WHERE completed_at < $1" , ro .FinishedBefore )
212211 if err != nil {
213212 return err
214213 }
@@ -246,7 +245,7 @@ func (mb *postgresBackend) RemoveWorkflowInstances(ctx context.Context, options
246245
247246 batchSize := ro .BatchSize
248247 for i := 0 ; i < len (pairs ); i += batchSize {
249- tx , err := mb .db .BeginTx (ctx , nil )
248+ tx , err := pb .db .BeginTx (ctx , nil )
250249 if err != nil {
251250 return err
252251 }
@@ -286,8 +285,8 @@ func (mb *postgresBackend) RemoveWorkflowInstances(ctx context.Context, options
286285 return nil
287286}
288287
289- func (mb * postgresBackend ) CancelWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
290- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
288+ func (pb * postgresBackend ) CancelWorkflowInstance (ctx context.Context , instance * workflow.Instance , event * history.Event ) error {
289+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
291290 Isolation : sql .LevelReadCommitted ,
292291 })
293292 if err != nil {
@@ -313,8 +312,8 @@ func (mb *postgresBackend) CancelWorkflowInstance(ctx context.Context, instance
313312 return tx .Commit ()
314313}
315314
316- func (mb * postgresBackend ) GetWorkflowInstanceHistory (ctx context.Context , instance * workflow.Instance , lastSequenceID * int64 ) ([]* history.Event , error ) {
317- tx , err := mb .db .BeginTx (ctx , nil )
315+ func (pb * postgresBackend ) GetWorkflowInstanceHistory (ctx context.Context , instance * workflow.Instance , lastSequenceID * int64 ) ([]* history.Event , error ) {
316+ tx , err := pb .db .BeginTx (ctx , nil )
318317 if err != nil {
319318 return nil , err
320319 }
@@ -379,8 +378,8 @@ func (mb *postgresBackend) GetWorkflowInstanceHistory(ctx context.Context, insta
379378 return h , nil
380379}
381380
382- func (mb * postgresBackend ) GetWorkflowInstanceState (ctx context.Context , instance * workflow.Instance ) (core.WorkflowInstanceState , error ) {
383- row := mb .db .QueryRowContext (
381+ func (pb * postgresBackend ) GetWorkflowInstanceState (ctx context.Context , instance * workflow.Instance ) (core.WorkflowInstanceState , error ) {
382+ row := pb .db .QueryRowContext (
384383 ctx ,
385384 "SELECT state FROM instances WHERE instance_id = $1 AND execution_id = $2" ,
386385 instance .InstanceID ,
@@ -446,8 +445,8 @@ func createInstance(ctx context.Context, tx *sql.Tx, queue workflow.Queue, wfi *
446445}
447446
448447// SignalWorkflow signals a running workflow instance
449- func (mb * postgresBackend ) SignalWorkflow (ctx context.Context , instanceID string , event * history.Event ) error {
450- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
448+ func (pb * postgresBackend ) SignalWorkflow (ctx context.Context , instanceID string , event * history.Event ) error {
449+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
451450 Isolation : sql .LevelReadCommitted ,
452451 })
453452 if err != nil {
@@ -475,20 +474,20 @@ func (mb *postgresBackend) SignalWorkflow(ctx context.Context, instanceID string
475474 return tx .Commit ()
476475}
477476
478- func (mb * postgresBackend ) PrepareWorkflowQueues (ctx context.Context , queues []workflow.Queue ) error {
477+ func (pb * postgresBackend ) PrepareWorkflowQueues (ctx context.Context , queues []workflow.Queue ) error {
479478 return nil
480479}
481480
482- func (mb * postgresBackend ) PrepareActivityQueues (ctx context.Context , queues []workflow.Queue ) error {
481+ func (pb * postgresBackend ) PrepareActivityQueues (ctx context.Context , queues []workflow.Queue ) error {
483482 return nil
484483}
485484
486485// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
487- func (mb * postgresBackend ) GetWorkflowTask (ctx context.Context , queues []workflow.Queue ) (* backend.WorkflowTask , error ) {
486+ func (pb * postgresBackend ) GetWorkflowTask (ctx context.Context , queues []workflow.Queue ) (* backend.WorkflowTask , error ) {
488487 if len (queues ) == 0 {
489488 return nil , errors .New ("no queues provided" )
490489 }
491- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
490+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
492491 Isolation : sql .LevelReadCommitted ,
493492 })
494493 if err != nil {
@@ -502,7 +501,7 @@ func (mb *postgresBackend) GetWorkflowTask(ctx context.Context, queues []workflo
502501 now , // event.visible_at
503502 now , // locked_until
504503 now , // sticky_until
505- mb .workerName , // worker
504+ pb .workerName , // worker
506505 }
507506
508507 for _ , q := range queues {
@@ -545,8 +544,8 @@ func (mb *postgresBackend) GetWorkflowTask(ctx context.Context, queues []workflo
545544 `UPDATE instances i
546545 SET locked_until = $1, worker = $2
547546 WHERE id = $3` ,
548- now .Add (mb .options .WorkflowLockTimeout ),
549- mb .workerName ,
547+ now .Add (pb .options .WorkflowLockTimeout ),
548+ pb .workerName ,
550549 id ,
551550 )
552551 if err != nil {
@@ -660,14 +659,14 @@ func (mb *postgresBackend) GetWorkflowTask(ctx context.Context, queues []workflo
660659// This checkpoints the execution. events are new events from the last workflow execution
661660// which will be added to the workflow instance history. workflowEvents are new events for the
662661// completed or other workflow instances.
663- func (mb * postgresBackend ) CompleteWorkflowTask (
662+ func (pb * postgresBackend ) CompleteWorkflowTask (
664663 ctx context.Context ,
665664 task * backend.WorkflowTask ,
666665 state core.WorkflowInstanceState ,
667666 executedEvents , activityEvents , timerEvents []* history.Event ,
668667 workflowEvents []* history.WorkflowEvent ,
669668) error {
670- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
669+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
671670 Isolation : sql .LevelReadCommitted ,
672671 })
673672 if err != nil {
@@ -687,12 +686,12 @@ func (mb *postgresBackend) CompleteWorkflowTask(
687686 res , err := tx .ExecContext (
688687 ctx ,
689688 `UPDATE instances SET locked_until = NULL, sticky_until = $1, completed_at = $2, state = $3 WHERE instance_id = $4 AND execution_id = $5 AND worker = $6 AND locked_until IS NOT NULL` ,
690- time .Now ().Add (mb .options .StickyTimeout ),
689+ time .Now ().Add (pb .options .StickyTimeout ),
691690 completedAt ,
692691 state ,
693692 instance .InstanceID ,
694693 instance .ExecutionID ,
695- mb .workerName ,
694+ pb .workerName ,
696695 )
697696 if err != nil {
698697 return fmt .Errorf ("unlocking instance: %w" , err )
@@ -797,8 +796,8 @@ func (mb *postgresBackend) CompleteWorkflowTask(
797796 }
798797 }
799798
800- if mb .options .RemoveContinuedAsNewInstances && state == core .WorkflowInstanceStateContinuedAsNew {
801- if err := mb .removeWorkflowInstance (ctx , instance , tx ); err != nil {
799+ if pb .options .RemoveContinuedAsNewInstances && state == core .WorkflowInstanceStateContinuedAsNew {
800+ if err := pb .removeWorkflowInstance (ctx , instance , tx ); err != nil {
802801 return fmt .Errorf ("removing old instance: %w" , err )
803802 }
804803 }
@@ -810,21 +809,21 @@ func (mb *postgresBackend) CompleteWorkflowTask(
810809 return nil
811810}
812811
813- func (mb * postgresBackend ) ExtendWorkflowTask (ctx context.Context , task * backend.WorkflowTask ) error {
814- tx , err := mb .db .BeginTx (ctx , nil )
812+ func (pb * postgresBackend ) ExtendWorkflowTask (ctx context.Context , task * backend.WorkflowTask ) error {
813+ tx , err := pb .db .BeginTx (ctx , nil )
815814 if err != nil {
816815 return err
817816 }
818817 defer tx .Rollback ()
819818
820- until := time .Now ().Add (mb .options .WorkflowLockTimeout )
819+ until := time .Now ().Add (pb .options .WorkflowLockTimeout )
821820 res , err := tx .ExecContext (
822821 ctx ,
823822 `UPDATE instances SET locked_until = $1 WHERE instance_id = $2 AND execution_id = $3 AND worker = $4` ,
824823 until ,
825824 task .WorkflowInstance .InstanceID ,
826825 task .WorkflowInstance .ExecutionID ,
827- mb .workerName ,
826+ pb .workerName ,
828827 )
829828 if err != nil {
830829 return fmt .Errorf ("extending workflow task lock: %w" , err )
@@ -840,11 +839,11 @@ func (mb *postgresBackend) ExtendWorkflowTask(ctx context.Context, task *backend
840839}
841840
842841// GetActivityTask returns a pending activity task or nil if there are no pending activities
843- func (mb * postgresBackend ) GetActivityTask (ctx context.Context , queues []workflow.Queue ) (* backend.ActivityTask , error ) {
842+ func (pb * postgresBackend ) GetActivityTask (ctx context.Context , queues []workflow.Queue ) (* backend.ActivityTask , error ) {
844843 if len (queues ) == 0 {
845844 return nil , errors .New ("no queues provided" )
846845 }
847- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
846+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
848847 Isolation : sql .LevelReadCommitted ,
849848 })
850849 if err != nil {
@@ -899,8 +898,8 @@ func (mb *postgresBackend) GetActivityTask(ctx context.Context, queues []workflo
899898 if _ , err := tx .ExecContext (
900899 ctx ,
901900 `UPDATE activities SET locked_until = $1, worker = $2 WHERE id = $3` ,
902- now .Add (mb .options .ActivityLockTimeout ),
903- mb .workerName ,
901+ now .Add (pb .options .ActivityLockTimeout ),
902+ pb .workerName ,
904903 id ,
905904 ); err != nil {
906905 return nil , fmt .Errorf ("locking activity: %w" , err )
@@ -922,8 +921,8 @@ func (mb *postgresBackend) GetActivityTask(ctx context.Context, queues []workflo
922921}
923922
924923// CompleteActivityTask completes a activity task retrieved using GetActivityTask
925- func (mb * postgresBackend ) CompleteActivityTask (ctx context.Context , task * backend.ActivityTask , result * history.Event ) error {
926- tx , err := mb .db .BeginTx (ctx , & sql.TxOptions {
924+ func (pb * postgresBackend ) CompleteActivityTask (ctx context.Context , task * backend.ActivityTask , result * history.Event ) error {
925+ tx , err := pb .db .BeginTx (ctx , & sql.TxOptions {
927926 Isolation : sql .LevelReadCommitted ,
928927 })
929928 if err != nil {
@@ -938,7 +937,7 @@ func (mb *postgresBackend) CompleteActivityTask(ctx context.Context, task *backe
938937 task .ActivityID ,
939938 task .WorkflowInstance .InstanceID ,
940939 task .WorkflowInstance .ExecutionID ,
941- mb .workerName ,
940+ pb .workerName ,
942941 task .Queue ,
943942 ); err != nil {
944943 return fmt .Errorf ("completing activity: %w" , err )
@@ -965,20 +964,20 @@ func (mb *postgresBackend) CompleteActivityTask(ctx context.Context, task *backe
965964 return nil
966965}
967966
968- func (mb * postgresBackend ) ExtendActivityTask (ctx context.Context , task * backend.ActivityTask ) error {
969- tx , err := mb .db .BeginTx (ctx , nil )
967+ func (pb * postgresBackend ) ExtendActivityTask (ctx context.Context , task * backend.ActivityTask ) error {
968+ tx , err := pb .db .BeginTx (ctx , nil )
970969 if err != nil {
971970 return err
972971 }
973972 defer tx .Rollback ()
974973
975- until := time .Now ().Add (mb .options .ActivityLockTimeout )
974+ until := time .Now ().Add (pb .options .ActivityLockTimeout )
976975 _ , err = tx .ExecContext (
977976 ctx ,
978977 `UPDATE activities SET locked_until = $1 WHERE activity_id = $2 AND worker = $3` ,
979978 until ,
980979 task .ActivityID ,
981- mb .workerName ,
980+ pb .workerName ,
982981 )
983982 if err != nil {
984983 return fmt .Errorf ("extending activity lock: %w" , err )
0 commit comments