Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ vendor/
# Executables produced by cadence-samples repo
bin/
docker-compose.yml

# Credentials
new_samples/client_samples/helloworld_tls/credentials/
65 changes: 65 additions & 0 deletions new_samples/client_samples/helloworld_tls/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
## Pre-requisites

Follow this document to start cadence server:
https://github.com/cadence-workflow/cadence/blob/e1267de12f8bc670fc84fab456d3495c8fc2f8a8/CONTRIBUTING.md#L1

1. **Build tools in cadence server**
```bash
make bins
```

2. **Start cassandra**
```bash
docker compose -f ./docker/dev/cassandra.yml up -d
```

3. **Install schema**
```bash
make install-schema
```

4. **Start cadence server with TLS**
```bash
./cadence-server --env development --zone tls start
```

## Running the Sample

### Step 1: Download Certificates
Download certificates from config/credentials of cadence server and place them in below folder

```bash
new_samples/client_samples/helloworld_tls/credentials
```

### Step 2: Register the Domain
Before running workflows, you must register the "default" domain:

```bash
cd new_samples/client_samples/helloworld_tls
go run register_domain.go
```

Expected output:
```
Successfully registered domain {"domain": "default"}
```

If the domain already exists, you'll see:
```
Domain already exists {"domain": "default"}
```

### Step 3: Run the Sample
In another terminal:
```bash
cd new_samples/client_samples/helloworld_tls
go run hello_world_tls.go
```

## References

- [Cadence Official Certificates](https://github.com/cadence-workflow/cadence/tree/master/config/credentials)
- [Cadence Documentation](https://cadenceworkflow.io/)
- [Go TLS Package](https://pkg.go.dev/crypto/tls)

86 changes: 86 additions & 0 deletions new_samples/client_samples/helloworld_tls/hello_world_tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"

"github.com/google/uuid"
"github.com/uber-common/cadence-samples/new_samples/worker"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
)

func main() {
withTLSDialOption, err := withTLSDialOption()
if err != nil {
panic(err)
}

cadenceClient := worker.BuildCadenceClient(withTLSDialOption)
logger := worker.BuildLogger()

domain := "default"
tasklist := "default-tasklist"
workflowID := uuid.New().String()
requestID := uuid.New().String()
executionTimeout := int32(60)
closeTimeout := int32(60)

workflowType := "cadence_samples.HelloWorldWorkflow"
input := []byte(`{"message": "Uber"}`)

req := shared.StartWorkflowExecutionRequest{
Domain: &domain,
WorkflowId: &workflowID,
WorkflowType: &shared.WorkflowType{
Name: &workflowType,
},
TaskList: &shared.TaskList{
Name: &tasklist,
},
Input: input,
ExecutionStartToCloseTimeoutSeconds: &executionTimeout,
TaskStartToCloseTimeoutSeconds: &closeTimeout,
RequestId: &requestID,
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
resp, err := cadenceClient.StartWorkflowExecution(ctx, &req)
if err != nil {
logger.Error("Failed to create workflow", zap.Error(err))
panic("Failed to create workflow.")
}

logger.Info("successfully started HelloWorld workflow", zap.String("runID", resp.GetRunId()))
}

func withTLSDialOption() (grpc.DialOption, error) {
// Present client cert for mutual TLS (if enabled on server)
clientCert, err := tls.LoadX509KeyPair("credentials/client.crt", "credentials/client.key")
if err != nil {
return nil, fmt.Errorf("Failed to load client certificate: %v", zap.Error(err))
}

// Load server CA
caCert, err := os.ReadFile("credentials/keytest.crt")
if err != nil {
return nil, fmt.Errorf("Failed to load server CA certificate: %v", zap.Error(err))
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := tls.Config{
InsecureSkipVerify: true,
RootCAs: caCertPool,
Certificates: []tls.Certificate{clientCert},
}
creds := credentials.NewTLS(&tlsConfig)
grpc.DialerCredentials(creds)
return grpc.DialerCredentials(creds), nil
}
88 changes: 88 additions & 0 deletions new_samples/client_samples/helloworld_tls/register_domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"

"github.com/uber-common/cadence-samples/new_samples/worker"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
)

func main() {
logger := worker.BuildLogger()
logger.Info("Registering default domain for cadence-vishwa with TLS...")

withTLSDialOption, err := buildTLSDialOption()
if err != nil {
logger.Fatal("Failed to build TLS dial option", zap.Error(err))
}

cadenceClient := worker.BuildCadenceClient(withTLSDialOption)

// Register the domain
domain := "default"
retentionDays := int32(7)
emitMetric := true

req := &shared.RegisterDomainRequest{
Name: &domain,
Description: stringPtr("Default domain for cadence samples"),
WorkflowExecutionRetentionPeriodInDays: &retentionDays,
EmitMetric: &emitMetric,
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = cadenceClient.RegisterDomain(ctx, req)
if err != nil {
// Check if domain already exists
if _, ok := err.(*shared.DomainAlreadyExistsError); ok {
logger.Info("Domain already exists", zap.String("domain", domain))
return
}
logger.Fatal("Failed to register domain", zap.Error(err))
}

logger.Info("Successfully registered domain", zap.String("domain", domain))
}

func buildTLSDialOption() (grpc.DialOption, error) {
// Load client certificate
clientCert, err := tls.LoadX509KeyPair("credentials/client.crt", "credentials/client.key")
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

// Load server CA
caCert, err := os.ReadFile("credentials/keytest.crt")
if err != nil {
return nil, fmt.Errorf("failed to load server CA certificate: %w", err)
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
RootCAs: caCertPool,
Certificates: []tls.Certificate{clientCert},
MinVersion: tls.VersionTLS12,
}

creds := credentials.NewTLS(tlsConfig)
return grpc.DialerCredentials(creds), nil
}

func stringPtr(s string) *string {
return &s
}
13 changes: 11 additions & 2 deletions new_samples/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
yarpchostport "go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -69,11 +71,18 @@ func StartWorker() {

}

func BuildCadenceClient() workflowserviceclient.Interface {
func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface {
grpcTransport := grpc.NewTransport()
myChooser := peer.NewSingle(
yarpchostport.Identify(HostPort),
grpcTransport.NewDialer(dialOptions...),
)
outbound := grpcTransport.NewOutbound(myChooser)

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: ClientName,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: grpc.NewTransport().NewSingleOutbound(HostPort)},
CadenceService: {Unary: outbound},
},
})
if err := dispatcher.Start(); err != nil {
Expand Down
Loading