diff --git a/new_samples/common/cadence.go b/new_samples/common/cadence.go new file mode 100644 index 0000000..c3e2478 --- /dev/null +++ b/new_samples/common/cadence.go @@ -0,0 +1,79 @@ +// Package common provides shared utilities for Cadence samples. +// This simplifies worker setup by providing a one-liner for client creation. +// +// Once cadence-client adds NewGrpcClient(), this package can be replaced +// with a direct import from go.uber.org/cadence/client. +package common + +import ( + "fmt" + + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/compatibility" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" +) + +const ( + // DefaultHostPort is the default Cadence frontend address + DefaultHostPort = "127.0.0.1:7833" + // DefaultDomain is the default domain for samples + DefaultDomain = "cadence-samples" + // DefaultTaskList is the default task list for samples + DefaultTaskList = "cadence-samples-worker" + // CadenceService is the service name for YARPC + CadenceService = "cadence-frontend" +) + +// NewCadenceClient creates a new Cadence client connected via gRPC. +// This is a simplified helper that handles all the YARPC/gRPC boilerplate. +// +// Parameters: +// - caller: The name of the calling service (used for tracing) +// - hostPort: The Cadence frontend address (e.g., "localhost:7833") +// - dialOptions: Optional gRPC dial options (e.g., for TLS) +// +// Example: +// +// client, err := common.NewCadenceClient("my-worker", "localhost:7833") +func NewCadenceClient(caller, hostPort string, dialOptions ...grpc.DialOption) (workflowserviceclient.Interface, error) { + grpcTransport := grpc.NewTransport() + + myChooser := peer.NewSingle( + hostport.Identify(hostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: caller, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + return nil, fmt.Errorf("failed to start dispatcher: %w", err) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ), nil +} + +// MustNewCadenceClient is like NewCadenceClient but panics on error. +// Useful for sample code where error handling would add noise. +func MustNewCadenceClient(caller, hostPort string, dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + client, err := NewCadenceClient(caller, hostPort, dialOptions...) + if err != nil { + panic(err) + } + return client +} diff --git a/new_samples/hello_world/worker.go b/new_samples/hello_world/worker.go index c81932c..a5a10da 100644 --- a/new_samples/hello_world/worker.go +++ b/new_samples/hello_world/worker.go @@ -1,101 +1,41 @@ // THIS IS A GENERATED FILE // PLEASE DO NOT EDIT -// Package worker implements a Cadence worker with basic configurations. package main import ( - "github.com/uber-go/tally" - apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber-common/cadence-samples/new_samples/common" "go.uber.org/cadence/activity" - "go.uber.org/cadence/compatibility" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" - "go.uber.org/yarpc" - "go.uber.org/yarpc/peer" - yarpchostport "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -const ( - HostPort = "127.0.0.1:7833" - Domain = "cadence-samples" - // TaskListName identifies set of client workflows, activities, and workers. - // It could be your group or client or application name. - TaskListName = "cadence-samples-worker" - ClientName = "cadence-samples-worker" - CadenceService = "cadence-frontend" -) - -// StartWorker creates and starts a basic Cadence worker. +// StartWorker creates and starts a Cadence worker. func StartWorker() { - logger, cadenceClient := BuildLogger(), BuildCadenceClient() - workerOptions := worker.Options{ - Logger: logger, - MetricsScope: tally.NewTestScope(TaskListName, nil), - } + // Create Cadence client - all gRPC/YARPC boilerplate is handled by the helper + cadenceClient := common.MustNewCadenceClient( + common.DefaultTaskList, + common.DefaultHostPort, + ) w := worker.New( cadenceClient, - Domain, - TaskListName, - workerOptions) - // HelloWorld workflow registration - w.RegisterWorkflowWithOptions(HelloWorldWorkflow, workflow.RegisterOptions{Name: "cadence_samples.HelloWorldWorkflow"}) - w.RegisterActivityWithOptions(HelloWorldActivity, activity.RegisterOptions{Name: "cadence_samples.HelloWorldActivity"}) - - err := w.Start() - if err != nil { - panic("Failed to start worker: " + err.Error()) - } - logger.Info("Started Worker.", zap.String("worker", TaskListName)) - -} - -func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { - grpcTransport := grpc.NewTransport() - // Create a single peer chooser that identifies the host/port and configures - // a gRPC dialer with TLS credentials - myChooser := peer.NewSingle( - yarpchostport.Identify(HostPort), - grpcTransport.NewDialer(dialOptions...), + common.DefaultDomain, + common.DefaultTaskList, + worker.Options{}, ) - outbound := grpcTransport.NewOutbound(myChooser) - - dispatcher := yarpc.NewDispatcher(yarpc.Config{ - Name: ClientName, - Outbounds: yarpc.Outbounds{ - CadenceService: {Unary: outbound}, - }, - }) - if err := dispatcher.Start(); err != nil { - panic("Failed to start dispatcher: " + err.Error()) - } - - clientConfig := dispatcher.ClientConfig(CadenceService) - // Create a compatibility adapter that wraps proto-based YARPC clients - // to provide a unified interface for domain, workflow, worker, and visibility APIs - return compatibility.NewThrift2ProtoAdapter( - apiv1.NewDomainAPIYARPCClient(clientConfig), - apiv1.NewWorkflowAPIYARPCClient(clientConfig), - apiv1.NewWorkerAPIYARPCClient(clientConfig), - apiv1.NewVisibilityAPIYARPCClient(clientConfig), - ) -} + // Register workflows + w.RegisterWorkflowWithOptions(HelloWorldWorkflow, workflow.RegisterOptions{Name: "cadence_samples.HelloWorldWorkflow"}) -func BuildLogger() *zap.Logger { - config := zap.NewDevelopmentConfig() - config.Level.SetLevel(zapcore.InfoLevel) + // Register activities + w.RegisterActivityWithOptions(HelloWorldActivity, activity.RegisterOptions{Name: "cadence_samples.HelloWorldActivity"}) - var err error - logger, err := config.Build() - if err != nil { - panic("Failed to setup logger: " + err.Error()) + if err := w.Start(); err != nil { + panic("Failed to start worker: " + err.Error()) } - return logger + logger, _ := zap.NewDevelopment() + logger.Info("Started Worker.", zap.String("taskList", common.DefaultTaskList)) } diff --git a/new_samples/signal/worker.go b/new_samples/signal/worker.go index ee0d8d4..407ec64 100644 --- a/new_samples/signal/worker.go +++ b/new_samples/signal/worker.go @@ -1,101 +1,41 @@ // THIS IS A GENERATED FILE // PLEASE DO NOT EDIT -// Package worker implements a Cadence worker with basic configurations. package main import ( - "github.com/uber-go/tally" - apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber-common/cadence-samples/new_samples/common" "go.uber.org/cadence/activity" - "go.uber.org/cadence/compatibility" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" - "go.uber.org/yarpc" - "go.uber.org/yarpc/peer" - yarpchostport "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -const ( - HostPort = "127.0.0.1:7833" - Domain = "cadence-samples" - // TaskListName identifies set of client workflows, activities, and workers. - // It could be your group or client or application name. - TaskListName = "cadence-samples-worker" - ClientName = "cadence-samples-worker" - CadenceService = "cadence-frontend" -) - -// StartWorker creates and starts a basic Cadence worker. +// StartWorker creates and starts a Cadence worker. func StartWorker() { - logger, cadenceClient := BuildLogger(), BuildCadenceClient() - workerOptions := worker.Options{ - Logger: logger, - MetricsScope: tally.NewTestScope(TaskListName, nil), - } + // Create Cadence client - all gRPC/YARPC boilerplate is handled by the helper + cadenceClient := common.MustNewCadenceClient( + common.DefaultTaskList, + common.DefaultHostPort, + ) w := worker.New( cadenceClient, - Domain, - TaskListName, - workerOptions) - // HelloWorld workflow registration - w.RegisterWorkflowWithOptions(SimpleSignalWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SimpleSignalWorkflow"}) - w.RegisterActivityWithOptions(SimpleSignalActivity, activity.RegisterOptions{Name: "cadence_samples.SimpleSignalActivity"}) - - err := w.Start() - if err != nil { - panic("Failed to start worker: " + err.Error()) - } - logger.Info("Started Worker.", zap.String("worker", TaskListName)) - -} - -func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { - grpcTransport := grpc.NewTransport() - // Create a single peer chooser that identifies the host/port and configures - // a gRPC dialer with TLS credentials - myChooser := peer.NewSingle( - yarpchostport.Identify(HostPort), - grpcTransport.NewDialer(dialOptions...), + common.DefaultDomain, + common.DefaultTaskList, + worker.Options{}, ) - outbound := grpcTransport.NewOutbound(myChooser) - - dispatcher := yarpc.NewDispatcher(yarpc.Config{ - Name: ClientName, - Outbounds: yarpc.Outbounds{ - CadenceService: {Unary: outbound}, - }, - }) - if err := dispatcher.Start(); err != nil { - panic("Failed to start dispatcher: " + err.Error()) - } - - clientConfig := dispatcher.ClientConfig(CadenceService) - // Create a compatibility adapter that wraps proto-based YARPC clients - // to provide a unified interface for domain, workflow, worker, and visibility APIs - return compatibility.NewThrift2ProtoAdapter( - apiv1.NewDomainAPIYARPCClient(clientConfig), - apiv1.NewWorkflowAPIYARPCClient(clientConfig), - apiv1.NewWorkerAPIYARPCClient(clientConfig), - apiv1.NewVisibilityAPIYARPCClient(clientConfig), - ) -} + // Register workflows + w.RegisterWorkflowWithOptions(SimpleSignalWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SimpleSignalWorkflow"}) -func BuildLogger() *zap.Logger { - config := zap.NewDevelopmentConfig() - config.Level.SetLevel(zapcore.InfoLevel) + // Register activities + w.RegisterActivityWithOptions(SimpleSignalActivity, activity.RegisterOptions{Name: "cadence_samples.SimpleSignalActivity"}) - var err error - logger, err := config.Build() - if err != nil { - panic("Failed to setup logger: " + err.Error()) + if err := w.Start(); err != nil { + panic("Failed to start worker: " + err.Error()) } - return logger + logger, _ := zap.NewDevelopment() + logger.Info("Started Worker.", zap.String("taskList", common.DefaultTaskList)) } diff --git a/new_samples/template/worker.tmpl b/new_samples/template/worker.tmpl index aa9ab90..4a21c26 100644 --- a/new_samples/template/worker.tmpl +++ b/new_samples/template/worker.tmpl @@ -1,105 +1,45 @@ // THIS IS A GENERATED FILE // PLEASE DO NOT EDIT -// Package worker implements a Cadence worker with basic configurations. package main import ( - "github.com/uber-go/tally" - apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber-common/cadence-samples/new_samples/common" "go.uber.org/cadence/activity" - "go.uber.org/cadence/compatibility" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" - "go.uber.org/yarpc" - "go.uber.org/yarpc/peer" - yarpchostport "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) -const ( - HostPort = "127.0.0.1:7833" - Domain = "cadence-samples" - // TaskListName identifies set of client workflows, activities, and workers. - // It could be your group or client or application name. - TaskListName = "cadence-samples-worker" - ClientName = "cadence-samples-worker" - CadenceService = "cadence-frontend" -) - -// StartWorker creates and starts a basic Cadence worker. +// StartWorker creates and starts a Cadence worker. func StartWorker() { - logger, cadenceClient := BuildLogger(), BuildCadenceClient() - workerOptions := worker.Options{ - Logger: logger, - MetricsScope: tally.NewTestScope(TaskListName, nil), - } + // Create Cadence client - all gRPC/YARPC boilerplate is handled by the helper + cadenceClient := common.MustNewCadenceClient( + common.DefaultTaskList, + common.DefaultHostPort, + ) w := worker.New( cadenceClient, - Domain, - TaskListName, - workerOptions) - // HelloWorld workflow registration + common.DefaultDomain, + common.DefaultTaskList, + worker.Options{}, + ) + + // Register workflows {{- range .Workflows}} w.RegisterWorkflowWithOptions({{.}}, workflow.RegisterOptions{Name: "cadence_samples.{{.}}"}) {{- end}} + + // Register activities {{- range .Activities}} w.RegisterActivityWithOptions({{.}}, activity.RegisterOptions{Name: "cadence_samples.{{.}}"}) {{- end}} - err := w.Start() - if err != nil { + if err := w.Start(); err != nil { panic("Failed to start worker: " + err.Error()) } - logger.Info("Started Worker.", zap.String("worker", TaskListName)) - -} - -func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { - grpcTransport := grpc.NewTransport() - // Create a single peer chooser that identifies the host/port and configures - // a gRPC dialer with TLS credentials - myChooser := peer.NewSingle( - yarpchostport.Identify(HostPort), - grpcTransport.NewDialer(dialOptions...), - ) - outbound := grpcTransport.NewOutbound(myChooser) - - dispatcher := yarpc.NewDispatcher(yarpc.Config{ - Name: ClientName, - Outbounds: yarpc.Outbounds{ - CadenceService: {Unary: outbound}, - }, - }) - if err := dispatcher.Start(); err != nil { - panic("Failed to start dispatcher: " + err.Error()) - } - - clientConfig := dispatcher.ClientConfig(CadenceService) - - // Create a compatibility adapter that wraps proto-based YARPC clients - // to provide a unified interface for domain, workflow, worker, and visibility APIs - return compatibility.NewThrift2ProtoAdapter( - apiv1.NewDomainAPIYARPCClient(clientConfig), - apiv1.NewWorkflowAPIYARPCClient(clientConfig), - apiv1.NewWorkerAPIYARPCClient(clientConfig), - apiv1.NewVisibilityAPIYARPCClient(clientConfig), - ) -} - -func BuildLogger() *zap.Logger { - config := zap.NewDevelopmentConfig() - config.Level.SetLevel(zapcore.InfoLevel) - - var err error - logger, err := config.Build() - if err != nil { - panic("Failed to setup logger: " + err.Error()) - } - return logger + logger, _ := zap.NewDevelopment() + logger.Info("Started Worker.", zap.String("taskList", common.DefaultTaskList)) }