Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ PROGS = helloworld \
signalcounter \
sideeffect \
sleep \
dataconverter \
tls \

TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
Expand Down Expand Up @@ -70,6 +72,8 @@ TEST_DIRS=./cmd/samples/cron \
./cmd/samples/recipes/sideeffect \
./cmd/samples/recipes/signalcounter \
./cmd/samples/recipes/sleep \
./cmd/samples/recipes/dataconverter \
./cmd/samples/recipes/tls \
./cmd/samples/recovery \
./cmd/samples/pso \

Expand Down Expand Up @@ -181,6 +185,12 @@ sideeffect:
versioning:
go build -o bin/versioning cmd/samples/recipes/versioning/*.go

dataconverter:
go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go

tls:
go build -o bin/tls cmd/samples/recipes/tls/*.go

bins: helloworld \
versioning \
delaystart \
Expand Down Expand Up @@ -213,6 +223,8 @@ bins: helloworld \
signalcounter \
sideeffect \
sleep \
dataconverter \
tls \

test: bins
@rm -f test
Expand Down
93 changes: 87 additions & 6 deletions cmd/samples/common/factory.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package common

import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
Expand All @@ -12,8 +15,11 @@ import (
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
)

const (
Expand All @@ -32,6 +38,10 @@ type WorkflowClientBuilder struct {
ctxProps []workflow.ContextPropagator
dataConverter encoded.DataConverter
tracer opentracing.Tracer
tlsConfig *tls.Config
clientCertPath string
clientKeyPath string
caCertPath string
}

// NewBuilder creates a new WorkflowClientBuilder
Expand Down Expand Up @@ -89,6 +99,20 @@ func (b *WorkflowClientBuilder) SetTracer(tracer opentracing.Tracer) *WorkflowCl
return b
}

// SetTLSConfig sets the TLS configuration for the builder
func (b *WorkflowClientBuilder) SetTLSConfig(tlsConfig *tls.Config) *WorkflowClientBuilder {
b.tlsConfig = tlsConfig
return b
}

// SetTLSCertificates sets the TLS certificate paths for the builder
func (b *WorkflowClientBuilder) SetTLSCertificates(clientCertPath, clientKeyPath, caCertPath string) *WorkflowClientBuilder {
b.clientCertPath = clientCertPath
b.clientKeyPath = clientKeyPath
b.caCertPath = caCertPath
return b
}

// BuildCadenceClient builds a client to cadence service
func (b *WorkflowClientBuilder) BuildCadenceClient() (client.Client, error) {
service, err := b.BuildServiceClient()
Expand Down Expand Up @@ -163,12 +187,42 @@ func (b *WorkflowClientBuilder) build() error {
zap.String("ServiceName", _cadenceFrontendService),
zap.String("HostPort", b.hostPort))

b.dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: _cadenceClientName,
Outbounds: yarpc.Outbounds{
_cadenceFrontendService: {Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)},
},
})
// Check if TLS is configured
if b.tlsConfig != nil || (b.clientCertPath != "" && b.clientKeyPath != "" && b.caCertPath != "") {
// Build TLS configuration if certificate paths are provided but tlsConfig is not
if b.tlsConfig == nil {
tlsConfig, err := b.buildTLSConfig()
if err != nil {
return err
}
b.tlsConfig = tlsConfig
}

// Create TLS-enabled gRPC transport
grpcTransport := grpc.NewTransport()
var dialOptions []grpc.DialOption

creds := credentials.NewTLS(b.tlsConfig)
dialOptions = append(dialOptions, grpc.DialerCredentials(creds))

dialer := grpcTransport.NewDialer(dialOptions...)
outbound := grpcTransport.NewOutbound(peer.NewSingle(hostport.PeerIdentifier(b.hostPort), dialer))

b.dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: _cadenceClientName,
Outbounds: yarpc.Outbounds{
_cadenceFrontendService: {Unary: outbound},
},
})
} else {
// Create standard non-TLS dispatcher
b.dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: _cadenceClientName,
Outbounds: yarpc.Outbounds{
_cadenceFrontendService: {Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)},
},
})
}

if b.dispatcher != nil {
if err := b.dispatcher.Start(); err != nil {
Expand All @@ -178,3 +232,30 @@ func (b *WorkflowClientBuilder) build() error {

return nil
}

// buildTLSConfig creates a TLS configuration from certificate paths
func (b *WorkflowClientBuilder) buildTLSConfig() (*tls.Config, error) {
// Present client cert for mutual TLS (if enabled on server)
clientCert, err := tls.LoadX509KeyPair(b.clientCertPath, b.clientKeyPath)
if err != nil {
b.Logger.Fatal("Failed to load client certificate: %v", zap.Error(err))
return nil, err
}

// Load server CA
caCert, err := ioutil.ReadFile(b.caCertPath)
if err != nil {
b.Logger.Fatal("Failed to load server CA certificate: %v", zap.Error(err))
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
RootCAs: caCertPool,
Certificates: []tls.Certificate{clientCert},
}

return tlsConfig, nil
}
74 changes: 74 additions & 0 deletions cmd/samples/recipes/dataconverter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Data Converter Sample

This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth.

## Sample Description

The sample implements a custom compressed JSON data converter that:
- Serializes workflow inputs and activity parameters to JSON format
- Compresses the JSON data using gzip compression to reduce size
- Decompresses and deserializes workflow outputs and activity results from JSON format
- Provides significant storage and bandwidth savings for large payloads
- Demonstrates advanced data converter patterns for production use cases
- Shows real-time compression statistics and size comparisons

The sample includes two workflows:
1. **Simple Workflow**: Processes a basic `MyPayload` struct
2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits

All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime.

## Key Components

- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression
- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression
- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression
- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types
- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration
- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics
- **Tests**: Includes unit tests for both simple and large payload workflows
- **Compression**: Automatic gzip compression/decompression for all workflow data

## Steps to Run Sample

1. You need a cadence service running. See details in cmd/samples/README.md

2. Run the following command to start the worker:
```
./bin/dataconverter -m worker
```

3. Run the following command to execute the workflow:
```
./bin/dataconverter -m trigger
```

You should see:
- Compression statistics showing original vs compressed data sizes
- Workflow logs showing the processing of large payloads
- Activity execution logs with payload information
- Final workflow completion with compression benefits noted

## Customization

To implement your own data converter with compression or other features:
1. Create a struct that implements the `encoded.DataConverter` interface
2. Implement the `ToData` method for serialization and compression
3. Implement the `FromData` method for decompression and deserialization
4. Register the converter in the worker options

This pattern is useful when you need to:
- Reduce storage costs and bandwidth usage with compression
- Use specific serialization formats for performance or compatibility
- Add encryption/decryption to workflow data
- Implement custom compression algorithms (LZ4, Snappy, etc.)
- Support legacy data formats
- Add data validation or transformation during serialization

## Performance Benefits

The compressed data converter provides:
- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads
- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission
- **Cost Optimization**: Reduced storage costs in Cadence history
- **Scalability**: Better performance with large payloads
71 changes: 71 additions & 0 deletions cmd/samples/recipes/dataconverter/dataconverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"reflect"

"go.uber.org/cadence/encoded"
)

type compressedJSONDataConverter struct{}

func NewCompressedJSONDataConverter() encoded.DataConverter {
return &compressedJSONDataConverter{}
}

func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) {
// First, serialize to JSON
var jsonBuf bytes.Buffer
enc := json.NewEncoder(&jsonBuf)
for i, obj := range value {
err := enc.Encode(obj)
if err != nil {
return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}

// Then compress the JSON data
var compressedBuf bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBuf)

_, err := gzipWriter.Write(jsonBuf.Bytes())
if err != nil {
return nil, fmt.Errorf("unable to compress data: %v", err)
}

err = gzipWriter.Close()
if err != nil {
return nil, fmt.Errorf("unable to close gzip writer: %v", err)
}

return compressedBuf.Bytes(), nil
}

func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
// First, decompress the data
gzipReader, err := gzip.NewReader(bytes.NewBuffer(input))
if err != nil {
return fmt.Errorf("unable to create gzip reader: %v", err)
}
defer gzipReader.Close()

// Read the decompressed JSON data
decompressedData, err := io.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("unable to decompress data: %v", err)
}

// Then deserialize from JSON
dec := json.NewDecoder(bytes.NewBuffer(decompressedData))
for i, obj := range valuePtr {
err := dec.Decode(obj)
if err != nil {
return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}
return nil
}
Loading