Skip to content

Commit 0129f41

Browse files
authored
Rename metadata
1 parent f4e4a1c commit 0129f41

26 files changed

+119
-64
lines changed

backend/mock_Backend.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
342342
wfi = core.NewWorkflowInstance(instanceID, executionID)
343343
}
344344

345-
var metadata *core.WorkflowInstanceMetadata
345+
var metadata *core.WorkflowMetadata
346346
if metadataJson.Valid {
347347
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
348348
return nil, fmt.Errorf("parsing workflow metadata: %w", err)
@@ -630,7 +630,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
630630
t := &task.Activity{
631631
ID: event.ID,
632632
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
633-
WorkflowMetadata: metadata,
633+
Metadata: metadata,
634634
Event: event,
635635
}
636636

backend/redis/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (rb *redisBackend) GetActivityTask(ctx context.Context) (*task.Activity, er
2626

2727
return &task.Activity{
2828
WorkflowInstance: activityTask.Data.Instance,
29-
WorkflowMetadata: instanceState.Metadata,
29+
Metadata: instanceState.Metadata,
3030
ID: activityTask.TaskID, // Use the queue generated ID here
3131
Event: activityTask.Data.Event,
3232
}, nil

backend/redis/instance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ type instanceState struct {
107107
Instance *core.WorkflowInstance `json:"instance,omitempty"`
108108
State backend.WorkflowState `json:"state,omitempty"`
109109

110-
Metadata *core.WorkflowInstanceMetadata `json:"metadata,omitempty"`
110+
Metadata *core.WorkflowMetadata `json:"metadata,omitempty"`
111111

112112
CreatedAt time.Time `json:"created_at,omitempty"`
113113
CompletedAt *time.Time `json:"completed_at,omitempty"`
114114

115115
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
116116
}
117117

118-
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *core.WorkflowInstanceMetadata, ignoreDuplicate bool) error {
118+
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *core.WorkflowMetadata, ignoreDuplicate bool) error {
119119
key := instanceKey(instance.InstanceID)
120120

121121
createdAt := time.Now()

backend/sqlite/sqlite.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
8282
}
8383

8484
// Initial history is empty, store only new events
85-
if err := insertPendingEvents(ctx, tx, m.WorkflowInstance.InstanceID, []history.Event{m.HistoryEvent}); err != nil {
85+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
8686
return fmt.Errorf("inserting new event: %w", err)
8787
}
8888

@@ -274,7 +274,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
274274
wfi = core.NewWorkflowInstance(instanceID, executionID)
275275
}
276276

277-
var metadata *core.WorkflowInstanceMetadata
277+
var metadata *core.WorkflowMetadata
278278
if metadataJson.Valid {
279279
if err := json.Unmarshal([]byte(metadataJson.String), &metadata); err != nil {
280280
return nil, fmt.Errorf("parsing workflow metadata: %w", err)
@@ -511,7 +511,7 @@ func (sb *sqliteBackend) GetActivityTask(ctx context.Context) (*task.Activity, e
511511
t := &task.Activity{
512512
ID: event.ID,
513513
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
514-
WorkflowMetadata: metadata,
514+
Metadata: metadata,
515515
Event: event,
516516
}
517517

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ require (
1212
github.com/stretchr/testify v1.7.1
1313
go.opentelemetry.io/otel v1.7.0
1414
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.7.0
15-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0
1615
go.opentelemetry.io/otel/trace v1.7.0
1716
golang.org/x/tools v0.1.10
1817
)

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,8 +797,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 h1:cMDtmgJ5FpRvqx9x2Aq+
797797
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0/go.mod h1:ceUgdyfNv4h4gLxHR0WNfDiiVmZFodZhZSbOLhpxqXE=
798798
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.7.0 h1:pLP0MH4MAqeTEV0g/4flxw9O8Is48uAIauAnjznbW50=
799799
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.7.0/go.mod h1:aFXT9Ng2seM9eizF+LfKiyPBGy8xIZKwhusC1gIu3hA=
800-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0 h1:8hPcgCg0rUJiKE6VWahRvjgLUrNl7rW2hffUEPKXVEM=
801-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0/go.mod h1:K4GDXPY6TjUiwbOh+DkKaEdCF8y+lvMoM6SeAPyfCCM=
802800
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
803801
go.opentelemetry.io/otel/sdk v1.7.0/go.mod h1:uTEOTwaqIVuTGiJN7ii13Ibp75wJmYUDe374q6cZwUU=
804802
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=

internal/activity/executor.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,20 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
5757
e.logger)
5858
activityCtx := WithActivityState(ctx, as)
5959

60-
activityCtx = tracing.UnmarshalSpan(activityCtx, task.WorkflowMetadata)
60+
activityCtx = tracing.UnmarshalSpan(activityCtx, task.Metadata)
6161
activityCtx, span := e.tracer.Start(activityCtx, "ActivityTaskExecution", trace.WithAttributes(
6262
attribute.String("activity", a.Name),
6363
attribute.String(tracing.WorkflowInstanceID, task.WorkflowInstance.InstanceID),
6464
attribute.String(tracing.ActivityTaskID, task.ID),
6565
))
66+
defer span.End()
6667

6768
// Execute activity
6869
if addContext {
6970
args[0] = reflect.ValueOf(activityCtx)
7071
}
7172
r := activityFn.Call(args)
7273

73-
defer span.End()
74-
7574
if len(r) < 1 || len(r) > 2 {
7675
return nil, errors.New("activity has to return either (error) or (<result>, error)")
7776
}

internal/command/schedule_subworkflow.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ type ScheduleSubWorkflowCommand struct {
1212
command
1313

1414
Instance *core.WorkflowInstance
15-
Name string
16-
Inputs []payload.Payload
15+
Metadata *core.WorkflowMetadata
16+
17+
Name string
18+
Inputs []payload.Payload
1719
}
1820

1921
var _ Command = (*ScheduleSubWorkflowCommand)(nil)
2022

2123
func NewScheduleSubWorkflowCommand(
22-
id int64, parentInstance *core.WorkflowInstance, subWorkflowInstanceID, name string, inputs []payload.Payload,
24+
id int64, parentInstance *core.WorkflowInstance, subWorkflowInstanceID, name string, inputs []payload.Payload, metadata *core.WorkflowMetadata,
2325
) *ScheduleSubWorkflowCommand {
2426
if subWorkflowInstanceID == "" {
2527
subWorkflowInstanceID = uuid.New().String()
@@ -32,6 +34,7 @@ func NewScheduleSubWorkflowCommand(
3234
},
3335

3436
Instance: core.NewSubWorkflowInstance(subWorkflowInstanceID, uuid.NewString(), parentInstance.InstanceID, id),
37+
Metadata: metadata,
3538

3639
Name: name,
3740
Inputs: inputs,
@@ -45,8 +48,6 @@ func (*ScheduleSubWorkflowCommand) Type() string {
4548
func (c *ScheduleSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
4649
c.commit()
4750

48-
// TODO: TRACING: Add span
49-
5051
return &CommandResult{
5152
// Record scheduled sub-workflow
5253
Events: []history.Event{
@@ -55,6 +56,7 @@ func (c *ScheduleSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
5556
history.EventType_SubWorkflowScheduled,
5657
&history.SubWorkflowScheduledAttributes{
5758
SubWorkflowInstance: c.Instance,
59+
Metadata: c.Metadata,
5860
Name: c.Name,
5961
Inputs: c.Inputs,
6062
},
@@ -69,8 +71,9 @@ func (c *ScheduleSubWorkflowCommand) Commit(clock clock.Clock) *CommandResult {
6971
clock.Now(),
7072
history.EventType_WorkflowExecutionStarted,
7173
&history.ExecutionStartedAttributes{
72-
Name: c.Name,
73-
Inputs: c.Inputs,
74+
Name: c.Name,
75+
Inputs: c.Inputs,
76+
Metadata: c.Metadata,
7477
},
7578
history.ScheduleEventID(0),
7679
),

internal/core/metadata.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
package core
22

3-
type WorkflowInstanceMetadata map[string]string
3+
type WorkflowMetadata map[string]string
44

5-
func (wim WorkflowInstanceMetadata) Get(key string) string {
5+
func (wim WorkflowMetadata) Get(key string) string {
66
return wim[key]
77
}
88

9-
func (wim WorkflowInstanceMetadata) Set(key string, value string) {
9+
func (wim WorkflowMetadata) Set(key string, value string) {
1010
wim[key] = value
1111
}
1212

13-
func (wim WorkflowInstanceMetadata) Keys() []string {
13+
func (wim WorkflowMetadata) Keys() []string {
1414
r := make([]string, 0, len(wim))
1515

1616
for k := range wim {

0 commit comments

Comments
 (0)