@@ -3,6 +3,7 @@ package dbos
33import (
44 "context"
55 _ "embed"
6+ "encoding/json"
67 "errors"
78 "fmt"
89 "log/slog"
@@ -16,7 +17,6 @@ import (
1617 "github.com/google/uuid"
1718 "github.com/jackc/pgx/v5"
1819 "github.com/jackc/pgx/v5/pgconn"
19- "github.com/jackc/pgx/v5/pgtype"
2020 "github.com/jackc/pgx/v5/pgxpool"
2121)
2222
@@ -483,14 +483,18 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
483483 var result insertWorkflowResult
484484 var timeoutMSResult * int64
485485 var workflowDeadlineEpochMS * int64
486+
487+ // Marshal authenticated roles (slice of strings) to JSON for TEXT column
488+ authenticatedRoles , _ := json .Marshal (input .status .AuthenticatedRoles )
489+
486490 err = input .tx .QueryRow (ctx , query ,
487491 input .status .ID ,
488492 input .status .Status ,
489493 input .status .Name ,
490494 input .status .QueueName ,
491495 input .status .AuthenticatedUser ,
492496 input .status .AssumedRole ,
493- pgtype. Array [ string ]{ Elements : input . status . AuthenticatedRoles } ,
497+ authenticatedRoles ,
494498 input .status .ExecutorID ,
495499 applicationVersion ,
496500 input .status .ApplicationID ,
@@ -703,11 +707,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
703707 var deduplicationID * string
704708 var applicationVersion * string
705709 var executorID * string
710+ var authenticatedRoles * string
706711
707712 // Build scan arguments dynamically based on loaded columns
708713 scanArgs := []any {
709714 & wf .ID , & wf .Status , & wf .Name , & wf .AuthenticatedUser , & wf .AssumedRole ,
710- & wf . AuthenticatedRoles , & executorID , & createdAtMs ,
715+ & authenticatedRoles , & executorID , & createdAtMs ,
711716 & updatedAtMs , & applicationVersion , & wf .ApplicationID ,
712717 & wf .Attempts , & queueName , & timeoutMs ,
713718 & deadlineMs , & startedAtMs , & deduplicationID , & wf .Priority ,
@@ -720,6 +725,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
720725 scanArgs = append (scanArgs , & inputString )
721726 }
722727
728+ if authenticatedRoles != nil && * authenticatedRoles != "" {
729+ if err := json .Unmarshal ([]byte (* authenticatedRoles ), & wf .AuthenticatedRoles ); err != nil {
730+ return nil , fmt .Errorf ("failed to unmarshal authenticated_roles: %w" , err )
731+ }
732+ }
733+
723734 err := rows .Scan (scanArgs ... )
724735 if err != nil {
725736 return nil , fmt .Errorf ("failed to scan workflow row: %w" , err )
@@ -1086,13 +1097,16 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
10861097 return "" , fmt .Errorf ("failed to serialize input: %w" , err )
10871098 }
10881099
1100+ // Marshal authenticated roles (slice of strings) to JSON for TEXT column
1101+ authenticatedRoles , _ := json .Marshal (originalWorkflow .AuthenticatedRoles )
1102+
10891103 _ , err = tx .Exec (ctx , insertQuery ,
10901104 forkedWorkflowID ,
10911105 WorkflowStatusEnqueued ,
10921106 originalWorkflow .Name ,
10931107 originalWorkflow .AuthenticatedUser ,
10941108 originalWorkflow .AssumedRole ,
1095- originalWorkflow . AuthenticatedRoles ,
1109+ authenticatedRoles ,
10961110 & appVersion ,
10971111 originalWorkflow .ApplicationID ,
10981112 _DBOS_INTERNAL_QUEUE_NAME ,
0 commit comments