diff --git a/new_samples/consistentquery/consistentquery.go b/new_samples/consistentquery/consistentquery.go new file mode 100644 index 0000000..04c2620 --- /dev/null +++ b/new_samples/consistentquery/consistentquery.go @@ -0,0 +1,47 @@ +package main + +import ( + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// ConsistentQueryWorkflow demonstrates query handlers with signal handling. +func ConsistentQueryWorkflow(ctx workflow.Context) error { + queryResult := 0 + logger := workflow.GetLogger(ctx) + logger.Info("ConsistentQueryWorkflow started") + + // Setup query handler for "state" query type + err := workflow.SetQueryHandler(ctx, "state", func(input []byte) (int, error) { + return queryResult, nil + }) + if err != nil { + logger.Info("SetQueryHandler failed: " + err.Error()) + return err + } + + signalChan := workflow.GetSignalChannel(ctx, "increase") + + s := workflow.NewSelector(ctx) + s.AddReceive(signalChan, func(c workflow.Channel, more bool) { + c.Receive(ctx, nil) + queryResult += 1 + workflow.GetLogger(ctx).Info("Received signal!", zap.String("signal", "increase")) + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + for { + s.Select(ctx) + } + }) + + // Wait for timer before completing + workflow.NewTimer(ctx, time.Minute*2).Get(ctx, nil) + logger.Info("Timer fired") + + logger.Info("ConsistentQueryWorkflow completed") + return nil +} + diff --git a/new_samples/consistentquery/generator/README.md b/new_samples/consistentquery/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/consistentquery/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/consistentquery/generator/README_specific.md b/new_samples/consistentquery/generator/README_specific.md new file mode 100644 index 0000000..90e749f --- /dev/null +++ b/new_samples/consistentquery/generator/README_specific.md @@ -0,0 +1,53 @@ +## Consistent Query Sample + +This sample demonstrates **consistent queries** with signal handling. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 180 \ + --workflow_type cadence_samples.ConsistentQueryWorkflow +``` + +### Query the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow query \ + --wid \ + --qt state +``` + +### Send Signals to Update State + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow signal \ + --wid \ + --name increase +``` + +Each signal increments the counter. Query to see the updated value. + +### Key Concept: Query + Signal + +```go +queryResult := 0 + +// Register query handler +workflow.SetQueryHandler(ctx, "state", func() (int, error) { + return queryResult, nil +}) + +// Handle signals that modify state +signalChan := workflow.GetSignalChannel(ctx, "increase") +signalChan.Receive(ctx, nil) +queryResult += 1 // State changes are visible to queries +``` + diff --git a/new_samples/consistentquery/generator/generate.go b/new_samples/consistentquery/generator/generate.go new file mode 100644 index 0000000..c7b1b04 --- /dev/null +++ b/new_samples/consistentquery/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Consistent Query", + Workflows: []string{"ConsistentQueryWorkflow"}, + Activities: []string{}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/consistentquery/main.go b/new_samples/consistentquery/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/consistentquery/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/consistentquery/worker.go b/new_samples/consistentquery/worker.go new file mode 100644 index 0000000..eae2614 --- /dev/null +++ b/new_samples/consistentquery/worker.go @@ -0,0 +1,100 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(ConsistentQueryWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ConsistentQueryWorkflow"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/crossdomain/crossdomain.go b/new_samples/crossdomain/crossdomain.go new file mode 100644 index 0000000..1458799 --- /dev/null +++ b/new_samples/crossdomain/crossdomain.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "time" + + "github.com/google/uuid" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// Configuration for cross-domain execution +const ( + ChildDomain = "child-domain" // Must be registered separately + ChildTaskList = "child-task-list" +) + +// CrossDomainData is passed between workflows +type CrossDomainData struct { + Value string +} + +// CrossDomainWorkflow demonstrates executing child workflows in different domains. +func CrossDomainWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("CrossDomainWorkflow started") + + // Execute child workflow in a different domain + childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + Domain: ChildDomain, + WorkflowID: "child-wf-" + uuid.New().String(), + TaskList: ChildTaskList, + ExecutionStartToCloseTimeout: time.Minute, + }) + + err := workflow.ExecuteChildWorkflow(childCtx, ChildDomainWorkflow, CrossDomainData{Value: "test"}).Get(ctx, nil) + if err != nil { + logger.Error("Child workflow failed", zap.Error(err)) + return err + } + + logger.Info("CrossDomainWorkflow completed") + return nil +} + +// ChildDomainWorkflow runs in the child domain. +func ChildDomainWorkflow(ctx workflow.Context, data CrossDomainData) error { + logger := workflow.GetLogger(ctx) + logger.Info("ChildDomainWorkflow started", zap.String("value", data.Value)) + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, ChildDomainActivity).Get(ctx, nil) + if err != nil { + return err + } + + logger.Info("ChildDomainWorkflow completed") + return nil +} + +// ChildDomainActivity runs in the child domain. +func ChildDomainActivity(ctx context.Context) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("ChildDomainActivity running") + return "Hello from child domain!", nil +} + diff --git a/new_samples/crossdomain/generator/README.md b/new_samples/crossdomain/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/crossdomain/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/crossdomain/generator/README_specific.md b/new_samples/crossdomain/generator/README_specific.md new file mode 100644 index 0000000..aedb470 --- /dev/null +++ b/new_samples/crossdomain/generator/README_specific.md @@ -0,0 +1,48 @@ +## Cross Domain Sample + +This sample demonstrates executing **child workflows across different Cadence domains**. + +### Prerequisites + +Register a second domain for the child workflow: + +```bash +cadence --env development --domain child-domain domain register +``` + +Start a worker in the child domain (separate terminal): + +```bash +# Worker for child-domain would need to be configured separately +``` + +### Start the Parent Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 120 \ + --workflow_type cadence_samples.CrossDomainWorkflow +``` + +### Key Concept: Cross-Domain Child Options + +```go +childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + Domain: "other-domain", // Different domain! + WorkflowID: "child-wf-123", + TaskList: "other-task-list", + ExecutionStartToCloseTimeout: time.Minute, +}) + +workflow.ExecuteChildWorkflow(childCtx, ChildWorkflow, args...) +``` + +### Use Cases + +- Multi-tenant architectures +- Isolation between teams/services +- Cross-cluster workflow execution + diff --git a/new_samples/crossdomain/generator/generate.go b/new_samples/crossdomain/generator/generate.go new file mode 100644 index 0000000..73fb717 --- /dev/null +++ b/new_samples/crossdomain/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Cross Domain", + Workflows: []string{"CrossDomainWorkflow", "ChildDomainWorkflow"}, + Activities: []string{"ChildDomainActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/crossdomain/main.go b/new_samples/crossdomain/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/crossdomain/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/crossdomain/worker.go b/new_samples/crossdomain/worker.go new file mode 100644 index 0000000..50173ef --- /dev/null +++ b/new_samples/crossdomain/worker.go @@ -0,0 +1,102 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(CrossDomainWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CrossDomainWorkflow"}) + w.RegisterWorkflowWithOptions(ChildDomainWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ChildDomainWorkflow"}) + w.RegisterActivityWithOptions(ChildDomainActivity, activity.RegisterOptions{Name: "cadence_samples.ChildDomainActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/ctxpropagation/ctxpropagation.go b/new_samples/ctxpropagation/ctxpropagation.go new file mode 100644 index 0000000..f7eba10 --- /dev/null +++ b/new_samples/ctxpropagation/ctxpropagation.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "encoding/json" + "time" + + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// contextKey is an unexported type used as key for items stored in the Context object +type contextKey struct{} + +// propagateKey is the key used to store the value in the Context object +var propagateKey = contextKey{} + +// propagationKey is the key used by the propagator to pass values through the cadence server headers +const propagationKey = "_prop" + +// Values is a struct holding values to propagate +type Values struct { + Key string `json:"key"` + Value string `json:"value"` +} + +// propagator implements the custom context propagator +type propagator struct{} + +// NewContextPropagator returns a context propagator that propagates a set of +// string key-value pairs across a workflow +func NewContextPropagator() workflow.ContextPropagator { + return &propagator{} +} + +// Inject injects values from context into headers for propagation +func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(propagateKey) + payload, err := json.Marshal(value) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// InjectFromWorkflow injects values from workflow context into headers +func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error { + value := ctx.Value(propagateKey) + payload, err := json.Marshal(value) + if err != nil { + return err + } + writer.Set(propagationKey, payload) + return nil +} + +// Extract extracts values from headers and puts them into context +func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) { + if err := reader.ForEachKey(func(key string, value []byte) error { + if key == propagationKey { + var values Values + if err := json.Unmarshal(value, &values); err != nil { + return err + } + ctx = context.WithValue(ctx, propagateKey, values) + } + return nil + }); err != nil { + return nil, err + } + return ctx, nil +} + +// ExtractToWorkflow extracts values from headers and puts them into workflow context +func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) { + if err := reader.ForEachKey(func(key string, value []byte) error { + if key == propagationKey { + var values Values + if err := json.Unmarshal(value, &values); err != nil { + return err + } + ctx = workflow.WithValue(ctx, propagateKey, values) + } + return nil + }); err != nil { + return nil, err + } + return ctx, nil +} + +// SetValuesInHeader places the Values container inside the header +func SetValuesInHeader(values Values, header *shared.Header) error { + payload, err := json.Marshal(values) + if err == nil { + header.Fields[propagationKey] = payload + } else { + return err + } + return nil +} + +// CtxPropagationWorkflow demonstrates custom context propagation. +func CtxPropagationWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Second * 5, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + + // Check if custom context was propagated to workflow + if val := ctx.Value(propagateKey); val != nil { + vals := val.(Values) + logger.Info("Custom context propagated to workflow", zap.String(vals.Key, vals.Value)) + } + + var values Values + if err := workflow.ExecuteActivity(ctx, CtxPropagationActivity).Get(ctx, &values); err != nil { + logger.Error("Workflow failed.", zap.Error(err)) + return err + } + + logger.Info("Context propagated to activity", zap.String(values.Key, values.Value)) + logger.Info("Workflow completed.") + return nil +} + +// CtxPropagationActivity returns the propagated values from context. +func CtxPropagationActivity(ctx context.Context) (*Values, error) { + if val := ctx.Value(propagateKey); val != nil { + vals := val.(Values) + return &vals, nil + } + return nil, nil +} + diff --git a/new_samples/ctxpropagation/generator/README.md b/new_samples/ctxpropagation/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/ctxpropagation/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/ctxpropagation/generator/README_specific.md b/new_samples/ctxpropagation/generator/README_specific.md new file mode 100644 index 0000000..99b4222 --- /dev/null +++ b/new_samples/ctxpropagation/generator/README_specific.md @@ -0,0 +1,53 @@ +## Context Propagation Sample + +This sample demonstrates **custom context propagation** across workflow and activity execution. + +### Key Concept: Context Propagator + +A context propagator allows you to pass custom values (like user IDs, trace IDs, tenant info) through: +- Workflow execution +- Activity execution +- Child workflows + +```go +type propagator struct{} + +func (s *propagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error { + // Serialize and inject values into headers +} + +func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) { + // Extract values from headers into context +} +``` + +### Configuring the Worker + +Register the propagator when creating the worker: + +```go +workerOptions := worker.Options{ + ContextPropagators: []workflow.ContextPropagator{ + NewContextPropagator(), + }, +} +``` + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.CtxPropagationWorkflow +``` + +### Use Cases + +- Distributed tracing (trace IDs) +- Multi-tenancy (tenant IDs) +- User context (user IDs, auth tokens) +- Request correlation + diff --git a/new_samples/ctxpropagation/generator/generate.go b/new_samples/ctxpropagation/generator/generate.go new file mode 100644 index 0000000..89a92c2 --- /dev/null +++ b/new_samples/ctxpropagation/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Context Propagation", + Workflows: []string{"CtxPropagationWorkflow"}, + Activities: []string{"CtxPropagationActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/ctxpropagation/main.go b/new_samples/ctxpropagation/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/ctxpropagation/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/ctxpropagation/worker.go b/new_samples/ctxpropagation/worker.go new file mode 100644 index 0000000..ac8af02 --- /dev/null +++ b/new_samples/ctxpropagation/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(CtxPropagationWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CtxPropagationWorkflow"}) + w.RegisterActivityWithOptions(CtxPropagationActivity, activity.RegisterOptions{Name: "cadence_samples.CtxPropagationActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/sideeffect/generator/README.md b/new_samples/sideeffect/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/sideeffect/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/sideeffect/generator/README_specific.md b/new_samples/sideeffect/generator/README_specific.md new file mode 100644 index 0000000..3be3877 --- /dev/null +++ b/new_samples/sideeffect/generator/README_specific.md @@ -0,0 +1,46 @@ +## Side Effect Sample + +This sample demonstrates **workflow.SideEffect** for handling non-deterministic operations. + +### The Problem + +Workflows must be deterministic for replay. But sometimes you need non-deterministic values like: +- UUIDs +- Random numbers +- Current time +- External state + +### The Solution: SideEffect + +```go +workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.New().String() // Non-deterministic! +}).Get(&value) +``` + +On first execution, SideEffect runs the function and records the result. +On replay, it returns the recorded value without re-executing. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.SideEffectWorkflow +``` + +### Query the Generated Value + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow query \ + --wid \ + --qt value +``` + +The same UUID is returned every time you query, demonstrating deterministic replay. + diff --git a/new_samples/sideeffect/generator/generate.go b/new_samples/sideeffect/generator/generate.go new file mode 100644 index 0000000..348bfb1 --- /dev/null +++ b/new_samples/sideeffect/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Side Effect", + Workflows: []string{"SideEffectWorkflow"}, + Activities: []string{}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/sideeffect/main.go b/new_samples/sideeffect/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/sideeffect/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/sideeffect/sideeffect.go b/new_samples/sideeffect/sideeffect.go new file mode 100644 index 0000000..a48be7e --- /dev/null +++ b/new_samples/sideeffect/sideeffect.go @@ -0,0 +1,35 @@ +package main + +import ( + "github.com/google/uuid" + "go.uber.org/cadence/workflow" +) + +// SideEffectWorkflow demonstrates the SideEffect API for non-deterministic operations. +// SideEffect ensures the result is recorded and replayed deterministically. +func SideEffectWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("SideEffectWorkflow started") + + value := "" + + // Register query handler to inspect the value + err := workflow.SetQueryHandler(ctx, "value", func(input []byte) (string, error) { + return value, nil + }) + if err != nil { + logger.Info("SetQueryHandler failed: " + err.Error()) + return err + } + + // SideEffect records the result of non-deterministic operations + // On replay, it returns the recorded value instead of re-executing + workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.New().String() + }).Get(&value) + + logger.Info("SideEffect value: " + value) + logger.Info("SideEffectWorkflow completed") + return nil +} + diff --git a/new_samples/sideeffect/worker.go b/new_samples/sideeffect/worker.go new file mode 100644 index 0000000..0597fa0 --- /dev/null +++ b/new_samples/sideeffect/worker.go @@ -0,0 +1,100 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(SideEffectWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SideEffectWorkflow"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/tracing/generator/README.md b/new_samples/tracing/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/tracing/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/tracing/generator/README_specific.md b/new_samples/tracing/generator/README_specific.md new file mode 100644 index 0000000..b28ed6e --- /dev/null +++ b/new_samples/tracing/generator/README_specific.md @@ -0,0 +1,38 @@ +## Tracing Sample + +This sample demonstrates **distributed tracing** integration with Cadence workflows. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.TracingWorkflow \ + --input '"World"' +``` + +### Key Concept: Trace Propagation + +Cadence automatically propagates trace context through: +- Workflow execution +- Activity execution +- Child workflows +- Signals and queries + +To enable tracing, configure a tracer when creating the worker: + +```go +workerOptions := worker.Options{ + Tracer: opentracing.GlobalTracer(), +} +``` + +### Integration with Jaeger/Zipkin + +1. Set up a tracing backend (Jaeger, Zipkin, etc.) +2. Configure the tracer in your worker +3. View traces in your tracing UI to see the full execution path + diff --git a/new_samples/tracing/generator/generate.go b/new_samples/tracing/generator/generate.go new file mode 100644 index 0000000..d870a14 --- /dev/null +++ b/new_samples/tracing/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Tracing", + Workflows: []string{"TracingWorkflow"}, + Activities: []string{"TracingActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/tracing/main.go b/new_samples/tracing/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/tracing/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/tracing/tracing.go b/new_samples/tracing/tracing.go new file mode 100644 index 0000000..33f1851 --- /dev/null +++ b/new_samples/tracing/tracing.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// TracingWorkflow demonstrates distributed tracing in Cadence. +// Trace context is automatically propagated through workflow and activity execution. +func TracingWorkflow(ctx workflow.Context, name string) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("TracingWorkflow started") + + var result string + err := workflow.ExecuteActivity(ctx, TracingActivity, name).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed.", zap.Error(err)) + return err + } + + logger.Info("Workflow completed.", zap.String("Result", result)) + return nil +} + +// TracingActivity is a simple activity that returns a greeting. +func TracingActivity(ctx context.Context, name string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("TracingActivity started") + return "Hello " + name + "!", nil +} + diff --git a/new_samples/tracing/worker.go b/new_samples/tracing/worker.go new file mode 100644 index 0000000..0144e0f --- /dev/null +++ b/new_samples/tracing/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(TracingWorkflow, workflow.RegisterOptions{Name: "cadence_samples.TracingWorkflow"}) + w.RegisterActivityWithOptions(TracingActivity, activity.RegisterOptions{Name: "cadence_samples.TracingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/versioning/generator/README.md b/new_samples/versioning/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/versioning/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/versioning/generator/README_specific.md b/new_samples/versioning/generator/README_specific.md new file mode 100644 index 0000000..48d670d --- /dev/null +++ b/new_samples/versioning/generator/README_specific.md @@ -0,0 +1,39 @@ +## Versioning Sample + +This sample demonstrates **workflow versioning** for safe code deployments. + +### The Problem + +Changing workflow code can break running workflows during replay because the decision history no longer matches. + +### The Solution: GetVersion + +```go +version := workflow.GetVersion(ctx, "change-id", workflow.DefaultVersion, 1) +if version == workflow.DefaultVersion { + // Old code path +} else { + // New code path +} +``` + +- **DefaultVersion (-1)**: Workflows started before the change +- **Version 1+**: Workflows started after the change + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.VersionedWorkflow +``` + +### Deployment Strategy + +1. Deploy new code with GetVersion branching +2. New workflows use version 1, old workflows continue with DefaultVersion +3. Once all old workflows complete, remove DefaultVersion branch + diff --git a/new_samples/versioning/generator/generate.go b/new_samples/versioning/generator/generate.go new file mode 100644 index 0000000..826268e --- /dev/null +++ b/new_samples/versioning/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Versioning", + Workflows: []string{"VersionedWorkflow"}, + Activities: []string{"OldActivity", "NewActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/versioning/main.go b/new_samples/versioning/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/versioning/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/versioning/versioning.go b/new_samples/versioning/versioning.go new file mode 100644 index 0000000..7d0b3ac --- /dev/null +++ b/new_samples/versioning/versioning.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// VersionedWorkflow demonstrates workflow versioning for safe code changes. +// Use GetVersion to branch between old and new code paths during deployment. +func VersionedWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("VersionedWorkflow started") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + // GetVersion allows branching between old and new code paths + // - First param: unique change ID + // - Second param: minimum supported version (DefaultVersion = -1) + // - Third param: maximum supported version + version := workflow.GetVersion(ctx, "activity-change", workflow.DefaultVersion, 1) + + var err error + if version == workflow.DefaultVersion { + // Old code path - for workflows started before the change + err = workflow.ExecuteActivity(ctx, OldActivity).Get(ctx, nil) + } else { + // New code path - for workflows started after the change + err = workflow.ExecuteActivity(ctx, NewActivity).Get(ctx, nil) + } + + if err != nil { + return err + } + + logger.Info("VersionedWorkflow completed", zap.Int("version", int(version))) + return nil +} + +// OldActivity represents the original activity before the change. +func OldActivity(ctx context.Context) (string, error) { + activity.GetLogger(ctx).Info("Executing OldActivity (version DefaultVersion)") + return "old result", nil +} + +// NewActivity represents the new activity after the change. +func NewActivity(ctx context.Context) (string, error) { + activity.GetLogger(ctx).Info("Executing NewActivity (version 1)") + return "new result", nil +} + diff --git a/new_samples/versioning/worker.go b/new_samples/versioning/worker.go new file mode 100644 index 0000000..0ccde74 --- /dev/null +++ b/new_samples/versioning/worker.go @@ -0,0 +1,102 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(VersionedWorkflow, workflow.RegisterOptions{Name: "cadence_samples.VersionedWorkflow"}) + w.RegisterActivityWithOptions(OldActivity, activity.RegisterOptions{Name: "cadence_samples.OldActivity"}) + w.RegisterActivityWithOptions(NewActivity, activity.RegisterOptions{Name: "cadence_samples.NewActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +}