diff --git a/.gitignore b/.gitignore index 33bb963..d08c137 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ vendor/ # Executables produced by cadence-samples repo bin/ docker-compose.yml + +# Credentials +new_samples/client_samples/helloworld_tls/credentials/ diff --git a/new_samples/client_samples/helloworld_tls/README.md b/new_samples/client_samples/helloworld_tls/README.md new file mode 100644 index 0000000..cf835f5 --- /dev/null +++ b/new_samples/client_samples/helloworld_tls/README.md @@ -0,0 +1,65 @@ +## Pre-requisites + +Follow this document to start cadence server: +https://github.com/cadence-workflow/cadence/blob/e1267de12f8bc670fc84fab456d3495c8fc2f8a8/CONTRIBUTING.md#L1 + +1. **Build tools in cadence server** + ```bash + make bins + ``` + +2. **Start cassandra** + ```bash + docker compose -f ./docker/dev/cassandra.yml up -d + ``` + +3. **Install schema** + ```bash + make install-schema + ``` + +4. **Start cadence server with TLS** + ```bash + ./cadence-server --env development --zone tls start + ``` + +## Running the Sample + +### Step 1: Download Certificates +Download certificates from config/credentials of cadence server and place them in below folder + +```bash +new_samples/client_samples/helloworld_tls/credentials +``` + +### Step 2: Register the Domain +Before running workflows, you must register the "default" domain: + +```bash +cd new_samples/client_samples/helloworld_tls +go run register_domain.go +``` + +Expected output: +``` +Successfully registered domain {"domain": "default"} +``` + +If the domain already exists, you'll see: +``` +Domain already exists {"domain": "default"} +``` + +### Step 3: Run the Sample +In another terminal: +```bash +cd new_samples/client_samples/helloworld_tls +go run hello_world_tls.go +``` + +## References + +- [Cadence Official Certificates](https://github.com/cadence-workflow/cadence/tree/master/config/credentials) +- [Cadence Documentation](https://cadenceworkflow.io/) +- [Go TLS Package](https://pkg.go.dev/crypto/tls) + diff --git a/new_samples/client_samples/helloworld_tls/hello_world_tls.go b/new_samples/client_samples/helloworld_tls/hello_world_tls.go new file mode 100644 index 0000000..39c8d6e --- /dev/null +++ b/new_samples/client_samples/helloworld_tls/hello_world_tls.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "time" + + "github.com/google/uuid" + "github.com/uber-common/cadence-samples/new_samples/worker" + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "google.golang.org/grpc/credentials" +) + +func main() { + withTLSDialOption, err := withTLSDialOption() + if err != nil { + panic(err) + } + + cadenceClient := worker.BuildCadenceClient(withTLSDialOption) + logger := worker.BuildLogger() + + domain := "default" + tasklist := "default-tasklist" + workflowID := uuid.New().String() + requestID := uuid.New().String() + executionTimeout := int32(60) + closeTimeout := int32(60) + + workflowType := "cadence_samples.HelloWorldWorkflow" + input := []byte(`{"message": "Uber"}`) + + req := shared.StartWorkflowExecutionRequest{ + Domain: &domain, + WorkflowId: &workflowID, + WorkflowType: &shared.WorkflowType{ + Name: &workflowType, + }, + TaskList: &shared.TaskList{ + Name: &tasklist, + }, + Input: input, + ExecutionStartToCloseTimeoutSeconds: &executionTimeout, + TaskStartToCloseTimeoutSeconds: &closeTimeout, + RequestId: &requestID, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + resp, err := cadenceClient.StartWorkflowExecution(ctx, &req) + if err != nil { + logger.Error("Failed to create workflow", zap.Error(err)) + panic("Failed to create workflow.") + } + + logger.Info("successfully started HelloWorld workflow", zap.String("runID", resp.GetRunId())) +} + +func withTLSDialOption() (grpc.DialOption, error) { + // Present client cert for mutual TLS (if enabled on server) + clientCert, err := tls.LoadX509KeyPair("credentials/client.crt", "credentials/client.key") + if err != nil { + return nil, fmt.Errorf("Failed to load client certificate: %v", zap.Error(err)) + } + + // Load server CA + caCert, err := os.ReadFile("credentials/keytest.crt") + if err != nil { + return nil, fmt.Errorf("Failed to load server CA certificate: %v", zap.Error(err)) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig := tls.Config{ + InsecureSkipVerify: true, + RootCAs: caCertPool, + Certificates: []tls.Certificate{clientCert}, + } + creds := credentials.NewTLS(&tlsConfig) + grpc.DialerCredentials(creds) + return grpc.DialerCredentials(creds), nil +} diff --git a/new_samples/client_samples/helloworld_tls/register_domain.go b/new_samples/client_samples/helloworld_tls/register_domain.go new file mode 100644 index 0000000..8ab832b --- /dev/null +++ b/new_samples/client_samples/helloworld_tls/register_domain.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "time" + + "github.com/uber-common/cadence-samples/new_samples/worker" + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "google.golang.org/grpc/credentials" +) + +func main() { + logger := worker.BuildLogger() + logger.Info("Registering default domain for cadence-vishwa with TLS...") + + withTLSDialOption, err := buildTLSDialOption() + if err != nil { + logger.Fatal("Failed to build TLS dial option", zap.Error(err)) + } + + cadenceClient := worker.BuildCadenceClient(withTLSDialOption) + + // Register the domain + domain := "default" + retentionDays := int32(7) + emitMetric := true + + req := &shared.RegisterDomainRequest{ + Name: &domain, + Description: stringPtr("Default domain for cadence samples"), + WorkflowExecutionRetentionPeriodInDays: &retentionDays, + EmitMetric: &emitMetric, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err = cadenceClient.RegisterDomain(ctx, req) + if err != nil { + // Check if domain already exists + if _, ok := err.(*shared.DomainAlreadyExistsError); ok { + logger.Info("Domain already exists", zap.String("domain", domain)) + return + } + logger.Fatal("Failed to register domain", zap.Error(err)) + } + + logger.Info("Successfully registered domain", zap.String("domain", domain)) +} + +func buildTLSDialOption() (grpc.DialOption, error) { + // Load client certificate + clientCert, err := tls.LoadX509KeyPair("credentials/client.crt", "credentials/client.key") + if err != nil { + return nil, fmt.Errorf("failed to load client certificate: %w", err) + } + + // Load server CA + caCert, err := os.ReadFile("credentials/keytest.crt") + if err != nil { + return nil, fmt.Errorf("failed to load server CA certificate: %w", err) + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append CA certificate") + } + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + RootCAs: caCertPool, + Certificates: []tls.Certificate{clientCert}, + MinVersion: tls.VersionTLS12, + } + + creds := credentials.NewTLS(tlsConfig) + return grpc.DialerCredentials(creds), nil +} + +func stringPtr(s string) *string { + return &s +} diff --git a/new_samples/worker/worker.go b/new_samples/worker/worker.go index 20f494c..b3901c6 100644 --- a/new_samples/worker/worker.go +++ b/new_samples/worker/worker.go @@ -11,6 +11,8 @@ import ( "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -69,11 +71,18 @@ func StartWorker() { } -func BuildCadenceClient() workflowserviceclient.Interface { +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: ClientName, Outbounds: yarpc.Outbounds{ - CadenceService: {Unary: grpc.NewTransport().NewSingleOutbound(HostPort)}, + CadenceService: {Unary: outbound}, }, }) if err := dispatcher.Start(); err != nil {