Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions new_samples/common/cadence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Package common provides shared utilities for Cadence samples.
// This simplifies worker setup by providing a one-liner for client creation.
//
// Once cadence-client adds NewGrpcClient(), this package can be replaced
// with a direct import from go.uber.org/cadence/client.
package common

import (
"fmt"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/compatibility"
"go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
)

const (
// DefaultHostPort is the default Cadence frontend address
DefaultHostPort = "127.0.0.1:7833"
// DefaultDomain is the default domain for samples
DefaultDomain = "cadence-samples"
// DefaultTaskList is the default task list for samples
DefaultTaskList = "cadence-samples-worker"
// CadenceService is the service name for YARPC
CadenceService = "cadence-frontend"
)

// NewCadenceClient creates a new Cadence client connected via gRPC.
// This is a simplified helper that handles all the YARPC/gRPC boilerplate.
//
// Parameters:
// - caller: The name of the calling service (used for tracing)
// - hostPort: The Cadence frontend address (e.g., "localhost:7833")
// - dialOptions: Optional gRPC dial options (e.g., for TLS)
//
// Example:
//
// client, err := common.NewCadenceClient("my-worker", "localhost:7833")
func NewCadenceClient(caller, hostPort string, dialOptions ...grpc.DialOption) (workflowserviceclient.Interface, error) {
grpcTransport := grpc.NewTransport()

myChooser := peer.NewSingle(
hostport.Identify(hostPort),
grpcTransport.NewDialer(dialOptions...),
)
outbound := grpcTransport.NewOutbound(myChooser)

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: caller,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: outbound},
},
})
if err := dispatcher.Start(); err != nil {
return nil, fmt.Errorf("failed to start dispatcher: %w", err)
}

clientConfig := dispatcher.ClientConfig(CadenceService)

return compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
), nil
}

// MustNewCadenceClient is like NewCadenceClient but panics on error.
// Useful for sample code where error handling would add noise.
func MustNewCadenceClient(caller, hostPort string, dialOptions ...grpc.DialOption) workflowserviceclient.Interface {
client, err := NewCadenceClient(caller, hostPort, dialOptions...)
if err != nil {
panic(err)
}
return client
}
96 changes: 18 additions & 78 deletions new_samples/hello_world/worker.go
Original file line number Diff line number Diff line change
@@ -1,101 +1,41 @@
// THIS IS A GENERATED FILE
// PLEASE DO NOT EDIT

// Package worker implements a Cadence worker with basic configurations.
package main

import (
"github.com/uber-go/tally"
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber-common/cadence-samples/new_samples/common"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/compatibility"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
yarpchostport "go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
HostPort = "127.0.0.1:7833"
Domain = "cadence-samples"
// TaskListName identifies set of client workflows, activities, and workers.
// It could be your group or client or application name.
TaskListName = "cadence-samples-worker"
ClientName = "cadence-samples-worker"
CadenceService = "cadence-frontend"
)

// StartWorker creates and starts a basic Cadence worker.
// StartWorker creates and starts a Cadence worker.
func StartWorker() {
logger, cadenceClient := BuildLogger(), BuildCadenceClient()
workerOptions := worker.Options{
Logger: logger,
MetricsScope: tally.NewTestScope(TaskListName, nil),
}
// Create Cadence client - all gRPC/YARPC boilerplate is handled by the helper
cadenceClient := common.MustNewCadenceClient(
common.DefaultTaskList,
common.DefaultHostPort,
)

w := worker.New(
cadenceClient,
Domain,
TaskListName,
workerOptions)
// HelloWorld workflow registration
w.RegisterWorkflowWithOptions(HelloWorldWorkflow, workflow.RegisterOptions{Name: "cadence_samples.HelloWorldWorkflow"})
w.RegisterActivityWithOptions(HelloWorldActivity, activity.RegisterOptions{Name: "cadence_samples.HelloWorldActivity"})

err := w.Start()
if err != nil {
panic("Failed to start worker: " + err.Error())
}
logger.Info("Started Worker.", zap.String("worker", TaskListName))

}

func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface {
grpcTransport := grpc.NewTransport()
// Create a single peer chooser that identifies the host/port and configures
// a gRPC dialer with TLS credentials
myChooser := peer.NewSingle(
yarpchostport.Identify(HostPort),
grpcTransport.NewDialer(dialOptions...),
common.DefaultDomain,
common.DefaultTaskList,
worker.Options{},
)
outbound := grpcTransport.NewOutbound(myChooser)

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: ClientName,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: outbound},
},
})
if err := dispatcher.Start(); err != nil {
panic("Failed to start dispatcher: " + err.Error())
}

clientConfig := dispatcher.ClientConfig(CadenceService)

// Create a compatibility adapter that wraps proto-based YARPC clients
// to provide a unified interface for domain, workflow, worker, and visibility APIs
return compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
// Register workflows
w.RegisterWorkflowWithOptions(HelloWorldWorkflow, workflow.RegisterOptions{Name: "cadence_samples.HelloWorldWorkflow"})

func BuildLogger() *zap.Logger {
config := zap.NewDevelopmentConfig()
config.Level.SetLevel(zapcore.InfoLevel)
// Register activities
w.RegisterActivityWithOptions(HelloWorldActivity, activity.RegisterOptions{Name: "cadence_samples.HelloWorldActivity"})

var err error
logger, err := config.Build()
if err != nil {
panic("Failed to setup logger: " + err.Error())
if err := w.Start(); err != nil {
panic("Failed to start worker: " + err.Error())
}

return logger
logger, _ := zap.NewDevelopment()
logger.Info("Started Worker.", zap.String("taskList", common.DefaultTaskList))
}
96 changes: 18 additions & 78 deletions new_samples/signal/worker.go
Original file line number Diff line number Diff line change
@@ -1,101 +1,41 @@
// THIS IS A GENERATED FILE
// PLEASE DO NOT EDIT

// Package worker implements a Cadence worker with basic configurations.
package main

import (
"github.com/uber-go/tally"
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber-common/cadence-samples/new_samples/common"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/compatibility"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
yarpchostport "go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
HostPort = "127.0.0.1:7833"
Domain = "cadence-samples"
// TaskListName identifies set of client workflows, activities, and workers.
// It could be your group or client or application name.
TaskListName = "cadence-samples-worker"
ClientName = "cadence-samples-worker"
CadenceService = "cadence-frontend"
)

// StartWorker creates and starts a basic Cadence worker.
// StartWorker creates and starts a Cadence worker.
func StartWorker() {
logger, cadenceClient := BuildLogger(), BuildCadenceClient()
workerOptions := worker.Options{
Logger: logger,
MetricsScope: tally.NewTestScope(TaskListName, nil),
}
// Create Cadence client - all gRPC/YARPC boilerplate is handled by the helper
cadenceClient := common.MustNewCadenceClient(
common.DefaultTaskList,
common.DefaultHostPort,
)

w := worker.New(
cadenceClient,
Domain,
TaskListName,
workerOptions)
// HelloWorld workflow registration
w.RegisterWorkflowWithOptions(SimpleSignalWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SimpleSignalWorkflow"})
w.RegisterActivityWithOptions(SimpleSignalActivity, activity.RegisterOptions{Name: "cadence_samples.SimpleSignalActivity"})

err := w.Start()
if err != nil {
panic("Failed to start worker: " + err.Error())
}
logger.Info("Started Worker.", zap.String("worker", TaskListName))

}

func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface {
grpcTransport := grpc.NewTransport()
// Create a single peer chooser that identifies the host/port and configures
// a gRPC dialer with TLS credentials
myChooser := peer.NewSingle(
yarpchostport.Identify(HostPort),
grpcTransport.NewDialer(dialOptions...),
common.DefaultDomain,
common.DefaultTaskList,
worker.Options{},
)
outbound := grpcTransport.NewOutbound(myChooser)

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: ClientName,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: outbound},
},
})
if err := dispatcher.Start(); err != nil {
panic("Failed to start dispatcher: " + err.Error())
}

clientConfig := dispatcher.ClientConfig(CadenceService)

// Create a compatibility adapter that wraps proto-based YARPC clients
// to provide a unified interface for domain, workflow, worker, and visibility APIs
return compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
// Register workflows
w.RegisterWorkflowWithOptions(SimpleSignalWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SimpleSignalWorkflow"})

func BuildLogger() *zap.Logger {
config := zap.NewDevelopmentConfig()
config.Level.SetLevel(zapcore.InfoLevel)
// Register activities
w.RegisterActivityWithOptions(SimpleSignalActivity, activity.RegisterOptions{Name: "cadence_samples.SimpleSignalActivity"})

var err error
logger, err := config.Build()
if err != nil {
panic("Failed to setup logger: " + err.Error())
if err := w.Start(); err != nil {
panic("Failed to start worker: " + err.Error())
}

return logger
logger, _ := zap.NewDevelopment()
logger.Info("Started Worker.", zap.String("taskList", common.DefaultTaskList))
}
Loading
Loading