diff --git a/Makefile b/Makefile index 469f3fb..502f2bf 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,8 @@ PROGS = helloworld \ signalcounter \ sideeffect \ sleep \ + dataconverter \ + tls \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -70,6 +72,8 @@ TEST_DIRS=./cmd/samples/cron \ ./cmd/samples/recipes/sideeffect \ ./cmd/samples/recipes/signalcounter \ ./cmd/samples/recipes/sleep \ + ./cmd/samples/recipes/dataconverter \ + ./cmd/samples/recipes/tls \ ./cmd/samples/recovery \ ./cmd/samples/pso \ @@ -181,6 +185,12 @@ sideeffect: versioning: go build -o bin/versioning cmd/samples/recipes/versioning/*.go +dataconverter: + go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go + +tls: + go build -o bin/tls cmd/samples/recipes/tls/*.go + bins: helloworld \ versioning \ delaystart \ @@ -213,6 +223,8 @@ bins: helloworld \ signalcounter \ sideeffect \ sleep \ + dataconverter \ + tls \ test: bins @rm -f test diff --git a/cmd/samples/common/factory.go b/cmd/samples/common/factory.go index 2b50fd8..68d837d 100644 --- a/cmd/samples/common/factory.go +++ b/cmd/samples/common/factory.go @@ -1,7 +1,10 @@ package common import ( + "crypto/tls" + "crypto/x509" "errors" + "io/ioutil" "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" @@ -12,8 +15,11 @@ import ( "go.uber.org/cadence/encoded" "go.uber.org/cadence/workflow" "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + "go.uber.org/yarpc/peer/hostport" "go.uber.org/yarpc/transport/grpc" "go.uber.org/zap" + "google.golang.org/grpc/credentials" ) const ( @@ -32,6 +38,10 @@ type WorkflowClientBuilder struct { ctxProps []workflow.ContextPropagator dataConverter encoded.DataConverter tracer opentracing.Tracer + tlsConfig *tls.Config + clientCertPath string + clientKeyPath string + caCertPath string } // NewBuilder creates a new WorkflowClientBuilder @@ -89,6 +99,20 @@ func (b *WorkflowClientBuilder) SetTracer(tracer opentracing.Tracer) *WorkflowCl return b } +// SetTLSConfig sets the TLS configuration for the builder +func (b *WorkflowClientBuilder) SetTLSConfig(tlsConfig *tls.Config) *WorkflowClientBuilder { + b.tlsConfig = tlsConfig + return b +} + +// SetTLSCertificates sets the TLS certificate paths for the builder +func (b *WorkflowClientBuilder) SetTLSCertificates(clientCertPath, clientKeyPath, caCertPath string) *WorkflowClientBuilder { + b.clientCertPath = clientCertPath + b.clientKeyPath = clientKeyPath + b.caCertPath = caCertPath + return b +} + // BuildCadenceClient builds a client to cadence service func (b *WorkflowClientBuilder) BuildCadenceClient() (client.Client, error) { service, err := b.BuildServiceClient() @@ -163,12 +187,42 @@ func (b *WorkflowClientBuilder) build() error { zap.String("ServiceName", _cadenceFrontendService), zap.String("HostPort", b.hostPort)) - b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ - Name: _cadenceClientName, - Outbounds: yarpc.Outbounds{ - _cadenceFrontendService: {Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)}, - }, - }) + // Check if TLS is configured + if b.tlsConfig != nil || (b.clientCertPath != "" && b.clientKeyPath != "" && b.caCertPath != "") { + // Build TLS configuration if certificate paths are provided but tlsConfig is not + if b.tlsConfig == nil { + tlsConfig, err := b.buildTLSConfig() + if err != nil { + return err + } + b.tlsConfig = tlsConfig + } + + // Create TLS-enabled gRPC transport + grpcTransport := grpc.NewTransport() + var dialOptions []grpc.DialOption + + creds := credentials.NewTLS(b.tlsConfig) + dialOptions = append(dialOptions, grpc.DialerCredentials(creds)) + + dialer := grpcTransport.NewDialer(dialOptions...) + outbound := grpcTransport.NewOutbound(peer.NewSingle(hostport.PeerIdentifier(b.hostPort), dialer)) + + b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ + Name: _cadenceClientName, + Outbounds: yarpc.Outbounds{ + _cadenceFrontendService: {Unary: outbound}, + }, + }) + } else { + // Create standard non-TLS dispatcher + b.dispatcher = yarpc.NewDispatcher(yarpc.Config{ + Name: _cadenceClientName, + Outbounds: yarpc.Outbounds{ + _cadenceFrontendService: {Unary: grpc.NewTransport().NewSingleOutbound(b.hostPort)}, + }, + }) + } if b.dispatcher != nil { if err := b.dispatcher.Start(); err != nil { @@ -178,3 +232,30 @@ func (b *WorkflowClientBuilder) build() error { return nil } + +// buildTLSConfig creates a TLS configuration from certificate paths +func (b *WorkflowClientBuilder) buildTLSConfig() (*tls.Config, error) { + // Present client cert for mutual TLS (if enabled on server) + clientCert, err := tls.LoadX509KeyPair(b.clientCertPath, b.clientKeyPath) + if err != nil { + b.Logger.Fatal("Failed to load client certificate: %v", zap.Error(err)) + return nil, err + } + + // Load server CA + caCert, err := ioutil.ReadFile(b.caCertPath) + if err != nil { + b.Logger.Fatal("Failed to load server CA certificate: %v", zap.Error(err)) + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + RootCAs: caCertPool, + Certificates: []tls.Certificate{clientCert}, + } + + return tlsConfig, nil +} diff --git a/cmd/samples/recipes/dataconverter/README.md b/cmd/samples/recipes/dataconverter/README.md new file mode 100644 index 0000000..599b688 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/README.md @@ -0,0 +1,74 @@ +# Data Converter Sample + +This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth. + +## Sample Description + +The sample implements a custom compressed JSON data converter that: +- Serializes workflow inputs and activity parameters to JSON format +- Compresses the JSON data using gzip compression to reduce size +- Decompresses and deserializes workflow outputs and activity results from JSON format +- Provides significant storage and bandwidth savings for large payloads +- Demonstrates advanced data converter patterns for production use cases +- Shows real-time compression statistics and size comparisons + +The sample includes two workflows: +1. **Simple Workflow**: Processes a basic `MyPayload` struct +2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits + +All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime. + +## Key Components + +- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression +- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression +- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression +- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types +- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration +- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics +- **Tests**: Includes unit tests for both simple and large payload workflows +- **Compression**: Automatic gzip compression/decompression for all workflow data + +## Steps to Run Sample + +1. You need a cadence service running. See details in cmd/samples/README.md + +2. Run the following command to start the worker: + ``` + ./bin/dataconverter -m worker + ``` + +3. Run the following command to execute the workflow: + ``` + ./bin/dataconverter -m trigger + ``` + +You should see: +- Compression statistics showing original vs compressed data sizes +- Workflow logs showing the processing of large payloads +- Activity execution logs with payload information +- Final workflow completion with compression benefits noted + +## Customization + +To implement your own data converter with compression or other features: +1. Create a struct that implements the `encoded.DataConverter` interface +2. Implement the `ToData` method for serialization and compression +3. Implement the `FromData` method for decompression and deserialization +4. Register the converter in the worker options + +This pattern is useful when you need to: +- Reduce storage costs and bandwidth usage with compression +- Use specific serialization formats for performance or compatibility +- Add encryption/decryption to workflow data +- Implement custom compression algorithms (LZ4, Snappy, etc.) +- Support legacy data formats +- Add data validation or transformation during serialization + +## Performance Benefits + +The compressed data converter provides: +- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads +- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission +- **Cost Optimization**: Reduced storage costs in Cadence history +- **Scalability**: Better performance with large payloads \ No newline at end of file diff --git a/cmd/samples/recipes/dataconverter/dataconverter.go b/cmd/samples/recipes/dataconverter/dataconverter.go new file mode 100644 index 0000000..326eef3 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/dataconverter.go @@ -0,0 +1,71 @@ +package main + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "reflect" + + "go.uber.org/cadence/encoded" +) + +type compressedJSONDataConverter struct{} + +func NewCompressedJSONDataConverter() encoded.DataConverter { + return &compressedJSONDataConverter{} +} + +func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) { + // First, serialize to JSON + var jsonBuf bytes.Buffer + enc := json.NewEncoder(&jsonBuf) + for i, obj := range value { + err := enc.Encode(obj) + if err != nil { + return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + + // Then compress the JSON data + var compressedBuf bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBuf) + + _, err := gzipWriter.Write(jsonBuf.Bytes()) + if err != nil { + return nil, fmt.Errorf("unable to compress data: %v", err) + } + + err = gzipWriter.Close() + if err != nil { + return nil, fmt.Errorf("unable to close gzip writer: %v", err) + } + + return compressedBuf.Bytes(), nil +} + +func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error { + // First, decompress the data + gzipReader, err := gzip.NewReader(bytes.NewBuffer(input)) + if err != nil { + return fmt.Errorf("unable to create gzip reader: %v", err) + } + defer gzipReader.Close() + + // Read the decompressed JSON data + decompressedData, err := io.ReadAll(gzipReader) + if err != nil { + return fmt.Errorf("unable to decompress data: %v", err) + } + + // Then deserialize from JSON + dec := json.NewDecoder(bytes.NewBuffer(decompressedData)) + for i, obj := range valuePtr { + err := dec.Decode(obj) + if err != nil { + return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + return nil +} diff --git a/cmd/samples/recipes/dataconverter/large_payload.go b/cmd/samples/recipes/dataconverter/large_payload.go new file mode 100644 index 0000000..fce629a --- /dev/null +++ b/cmd/samples/recipes/dataconverter/large_payload.go @@ -0,0 +1,223 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "go.uber.org/cadence/encoded" +) + +// LargePayload represents a complex data structure with nested objects and arrays +type LargePayload struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Metadata map[string]interface{} `json:"metadata"` + Items []Item `json:"items"` + Config Config `json:"config"` + History []HistoryEntry `json:"history"` + Tags []string `json:"tags"` + Stats Statistics `json:"statistics"` +} + +// Item represents a single item in the payload +type Item struct { + ItemID string `json:"item_id"` + Title string `json:"title"` + Description string `json:"description"` + Price float64 `json:"price"` + Categories []string `json:"categories"` + Attributes map[string]string `json:"attributes"` + Reviews []Review `json:"reviews"` + Inventory Inventory `json:"inventory"` +} + +// Review represents a product review +type Review struct { + ReviewID string `json:"review_id"` + UserID string `json:"user_id"` + Rating int `json:"rating"` + Comment string `json:"comment"` + Helpful int `json:"helpful_votes"` + NotHelpful int `json:"not_helpful_votes"` + Date string `json:"date"` + Verified bool `json:"verified_purchase"` + Score float64 `json:"score"` +} + +// Inventory represents inventory information +type Inventory struct { + Quantity int `json:"quantity"` + Location string `json:"location"` + LastUpdated string `json:"last_updated"` + Status string `json:"status"` +} + +// Config represents configuration settings +type Config struct { + Version string `json:"version"` + Environment string `json:"environment"` + Settings map[string]string `json:"settings"` + Features []string `json:"features"` + Limits Limits `json:"limits"` +} + +// Limits represents system limits +type Limits struct { + MaxItems int `json:"max_items"` + MaxRequests int `json:"max_requests_per_minute"` + MaxFileSize int `json:"max_file_size_mb"` + MaxUsers int `json:"max_concurrent_users"` + TimeoutSecs int `json:"timeout_seconds"` +} + +// HistoryEntry represents a historical event +type HistoryEntry struct { + EventID string `json:"event_id"` + Timestamp string `json:"timestamp"` + EventType string `json:"event_type"` + UserID string `json:"user_id"` + Details map[string]string `json:"details"` + Severity string `json:"severity"` +} + +// Statistics represents statistical data +type Statistics struct { + TotalItems int `json:"total_items"` + TotalUsers int `json:"total_users"` + AverageRating float64 `json:"average_rating"` + TotalRevenue float64 `json:"total_revenue"` + ActiveOrders int `json:"active_orders"` + CompletionRate float64 `json:"completion_rate"` +} + +// CreateLargePayload creates a sample large payload with realistic data +func CreateLargePayload() LargePayload { + // Create a large description with repeated text to demonstrate compression + largeDescription := strings.Repeat("This is a comprehensive product catalog containing thousands of items with detailed descriptions, specifications, and user reviews. Each item includes pricing information, inventory status, and customer feedback. The catalog is designed to provide complete information for customers making purchasing decisions. ", 50) + + // Create sample items + items := make([]Item, 100) + for i := 0; i < 100; i++ { + reviews := make([]Review, 25) + for j := 0; j < 25; j++ { + reviews[j] = Review{ + ReviewID: fmt.Sprintf("review_%d_%d", i, j), + UserID: fmt.Sprintf("user_%d", j), + Rating: 1 + (j % 5), + Comment: strings.Repeat("This is a detailed customer review with comprehensive feedback about the product quality, delivery experience, and overall satisfaction. The customer provides specific details about their experience. ", 3), + Helpful: j * 2, + NotHelpful: j, + Date: "2024-01-15T10:30:00Z", + Verified: j%2 == 0, + Score: float64(1+(j%5)) + float64(j%10)/10.0, + } + } + + attributes := make(map[string]string) + for k := 0; k < 20; k++ { + attributes[fmt.Sprintf("attr_%d", k)] = strings.Repeat("This is a detailed attribute description with comprehensive information about the product specification. ", 2) + } + + items[i] = Item{ + ItemID: fmt.Sprintf("item_%d", i), + Title: fmt.Sprintf("High-Quality Product %d with Advanced Features", i), + Description: strings.Repeat("This is a premium product with exceptional quality and advanced features designed for professional use. It includes comprehensive documentation and support. ", 10), + Price: float64(100+i*10) + float64(i%100)/100.0, + Categories: []string{"Electronics", "Professional", "Premium", "Advanced"}, + Attributes: attributes, + Reviews: reviews, + Inventory: Inventory{ + Quantity: 100 + i, + Location: fmt.Sprintf("Warehouse %d", i%5), + LastUpdated: "2024-01-15T10:30:00Z", + Status: "In Stock", + }, + } + } + + // Create history entries + history := make([]HistoryEntry, 50) + for i := 0; i < 50; i++ { + details := make(map[string]string) + for j := 0; j < 10; j++ { + details[fmt.Sprintf("detail_%d", j)] = strings.Repeat("This is a detailed event description with comprehensive information about the system event and its impact. ", 2) + } + + history[i] = HistoryEntry{ + EventID: fmt.Sprintf("event_%d", i), + Timestamp: "2024-01-15T10:30:00Z", + EventType: "system_update", + UserID: fmt.Sprintf("admin_%d", i%5), + Details: details, + Severity: "medium", + } + } + + // Create metadata + metadata := make(map[string]interface{}) + for i := 0; i < 30; i++ { + metadata[fmt.Sprintf("meta_key_%d", i)] = strings.Repeat("This is comprehensive metadata information with detailed descriptions and specifications. ", 5) + } + + return LargePayload{ + ID: "large_payload_001", + Name: "Comprehensive Product Catalog", + Description: largeDescription, + Metadata: metadata, + Items: items, + Config: Config{ + Version: "2.1.0", + Environment: "production", + Settings: map[string]string{ + "cache_enabled": "true", + "compression_level": "high", + "timeout": "30s", + "max_connections": "1000", + "retry_attempts": "3", + }, + Features: []string{"advanced_search", "real_time_updates", "analytics", "reporting", "integration"}, + Limits: Limits{ + MaxItems: 10000, + MaxRequests: 1000, + MaxFileSize: 100, + MaxUsers: 5000, + TimeoutSecs: 30, + }, + }, + History: history, + Tags: []string{"catalog", "products", "inventory", "analytics", "reporting", "integration", "api", "dashboard"}, + Stats: Statistics{ + TotalItems: 10000, + TotalUsers: 5000, + AverageRating: 4.2, + TotalRevenue: 1250000.50, + ActiveOrders: 250, + CompletionRate: 98.5, + }, + } +} + +// GetPayloadSizeInfo returns information about the payload size before and after compression +func GetPayloadSizeInfo(payload LargePayload, converter encoded.DataConverter) (int, int, float64, error) { + // Serialize to JSON to get original size + jsonData, err := json.Marshal(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to marshal payload: %v", err) + } + originalSize := len(jsonData) + + // Compress using our converter + compressedData, err := converter.ToData(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to compress payload: %v", err) + } + compressedSize := len(compressedData) + + // Calculate compression ratio + compressionRatio := float64(compressedSize) / float64(originalSize) + compressionPercentage := (1.0 - compressionRatio) * 100 + + return originalSize, compressedSize, compressionPercentage, nil +} diff --git a/cmd/samples/recipes/dataconverter/main.go b/cmd/samples/recipes/dataconverter/main.go new file mode 100644 index 0000000..fa2af20 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "flag" + "fmt" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +const ( + ApplicationName = "dataConverterTaskList" +) + +func startWorkers(h *common.SampleHelper) { + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + DataConverter: NewCompressedJSONDataConverter(), + } + h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) +} + +func startWorkflow(h *common.SampleHelper) { + // Create a large payload to demonstrate compression benefits + largeInput := CreateLargePayload() + + // Show compression statistics before starting workflow + converter := NewCompressedJSONDataConverter() + originalSize, compressedSize, compressionPercentage, err := GetPayloadSizeInfo(largeInput, converter) + if err != nil { + fmt.Printf("Error calculating compression stats: %v\n", err) + } else { + fmt.Printf("=== Compression Statistics ===\n") + fmt.Printf("Original JSON size: %d bytes (%.2f KB)\n", originalSize, float64(originalSize)/1024.0) + fmt.Printf("Compressed size: %d bytes (%.2f KB)\n", compressedSize, float64(compressedSize)/1024.0) + fmt.Printf("Compression ratio: %.2f%% reduction\n", compressionPercentage) + fmt.Printf("Space saved: %d bytes (%.2f KB)\n", originalSize-compressedSize, float64(originalSize-compressedSize)/1024.0) + fmt.Printf("=============================\n\n") + } + + workflowOptions := client.StartWorkflowOptions{ + ID: "dataconverter_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute, + DecisionTaskStartToCloseTimeout: time.Minute, + } + h.StartWorkflow(workflowOptions, LargeDataConverterWorkflowName, largeInput) +} + +func registerWorkflowAndActivity(h *common.SampleHelper) { + h.RegisterWorkflowWithAlias(largeDataConverterWorkflow, LargeDataConverterWorkflowName) + h.RegisterActivity(largeDataConverterActivity) +} + +func main() { + var mode string + flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.") + flag.Parse() + + var h common.SampleHelper + h.DataConverter = NewCompressedJSONDataConverter() + h.SetupServiceConfig() + + switch mode { + case "worker": + registerWorkflowAndActivity(&h) + startWorkers(&h) + select {} + case "trigger": + startWorkflow(&h) + } +} diff --git a/cmd/samples/recipes/dataconverter/workflow.go b/cmd/samples/recipes/dataconverter/workflow.go new file mode 100644 index 0000000..2edc1a5 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +type MyPayload struct { + Msg string + Count int +} + +// LargeDataConverterWorkflowName is the workflow name for large payload processing +const LargeDataConverterWorkflowName = "largeDataConverterWorkflow" + +// largeDataConverterWorkflow demonstrates processing large payloads with compression +func largeDataConverterWorkflow(ctx workflow.Context, input LargePayload) (LargePayload, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Large payload workflow started", zap.String("payload_id", input.ID)) + logger.Info("Processing large payload with compression", zap.Int("items_count", len(input.Items))) + + activityOptions := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result LargePayload + err := workflow.ExecuteActivity(ctx, largeDataConverterActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Large payload activity failed", zap.Error(err)) + return LargePayload{}, err + } + + logger.Info("Large payload workflow completed", zap.String("result_id", result.ID)) + logger.Info("Note: All large payload data was automatically compressed/decompressed using gzip compression") + return result, nil +} + +func largeDataConverterActivity(ctx context.Context, input LargePayload) (LargePayload, error) { + logger := activity.GetLogger(ctx) + logger.Info("Large payload activity received input", zap.String("payload_id", input.ID), zap.Int("items_count", len(input.Items))) + + // Process the large payload (in a real scenario, this might involve data transformation, validation, etc.) + input.Name = input.Name + " (Processed)" + input.Stats.TotalItems = len(input.Items) + + logger.Info("Large payload activity completed", zap.String("result_id", input.ID)) + return input, nil +} diff --git a/cmd/samples/recipes/dataconverter/workflow_test.go b/cmd/samples/recipes/dataconverter/workflow_test.go new file mode 100644 index 0000000..4313cff --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/encoded" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/worker" +) + +func Test_LargeDataConverterWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(largeDataConverterWorkflow) + env.RegisterActivity(largeDataConverterActivity) + + dataConverter := NewCompressedJSONDataConverter() + workerOptions := worker.Options{ + DataConverter: dataConverter, + } + env.SetWorkerOptions(workerOptions) + + input := CreateLargePayload() + + var activityResult LargePayload + env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) { + result.Get(&activityResult) + }) + + env.ExecuteWorkflow(largeDataConverterWorkflow, input) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, "Comprehensive Product Catalog (Processed)", activityResult.Name) + require.Equal(t, 100, activityResult.Stats.TotalItems) +} diff --git a/cmd/samples/recipes/tls/README.md b/cmd/samples/recipes/tls/README.md new file mode 100644 index 0000000..5a69e02 --- /dev/null +++ b/cmd/samples/recipes/tls/README.md @@ -0,0 +1,102 @@ +# TLS Connection Verification Recipe + +This recipe demonstrates how to verify TLS connections between Cadence SDK and frontend using a Cadence workflow, showing both successful and unsuccessful scenarios. + +## What This Recipe Shows + +1. **TLS Workflow Execution** - Orchestrates TLS testing using Cadence workflow and activities +2. **Certificate Management** - Automated certificate creation and cleanup +3. **TLS Connection Testing** - Tests connections with valid certificates +4. **Error Handling** - Tests connections with missing certificates +5. **Standard Connection Fallback** - Tests non-TLS connections + +## Prerequisites + +- Cadence server running on `localhost:7933` +- OpenSSL installed for certificate generation +- Go environment set up + +## Steps to Run Sample + +1. You need a cadence service running. See details in cmd/samples/README.md + +2. Run the following command to start the worker: + ``` + ./bin/tls -m worker + ``` + +3. Run the following command to execute the workflow: + ``` + ./bin/tls -m trigger + ``` + +## Workflow Structure + +The recipe follows the standard Cadence worker/trigger pattern: + +- **Worker Mode**: Starts a Cadence worker that can execute the TLS workflow and activities +- **Trigger Mode**: Triggers a new execution of the TLS workflow + +## Activities + +1. **setupCertificatesActivity** - Creates fresh TLS certificates for testing +2. **testTLSConnectionActivity** - Tests TLS connections with provided certificates +3. **testStandardConnectionActivity** - Tests standard non-TLS connections +4. **cleanupCertificatesActivity** - Cleans up generated certificates + +## Expected Workflow Output + +When you run the trigger mode, the workflow will execute and you'll see logs showing: + +``` +INFO TLS Workflow started +INFO Setting up certificates for testing +INFO Certificates setup completed +INFO Testing TLS connection with valid certificates +INFO Valid TLS connection test result: SUCCESS - TLS connection established +INFO Testing TLS connection with missing certificates +INFO Missing certificates test failed as expected +INFO Testing standard non-TLS connection +INFO Standard connection test result: SUCCESS - Standard connection established +INFO Cleaning up certificates +INFO Certificate cleanup completed +INFO TLS Workflow completed successfully +``` + +## Certificate Management + +The workflow automatically: +- Creates a `cmd/samples/recipes/tls/certs/` directory +- Generates CA, server, and client certificates using OpenSSL +- Tests connections with the generated certificates +- Cleans up all certificate files after testing + +Generated certificates include: +- `ca.key` / `ca.crt` - Certificate Authority +- `server.key` / `server.crt` - Server certificates +- `client.key` / `client.crt` - Client certificates + +## Key Features + +- **Workflow Orchestration**: Uses Cadence workflow to coordinate TLS testing +- **Automated Certificate Generation**: No manual certificate setup required +- **Comprehensive Testing**: Tests multiple TLS scenarios in sequence +- **Error Handling**: Graceful handling of connection failures +- **Clean Resource Management**: Automatic cleanup of generated certificates +- **Production Patterns**: Demonstrates proper Cadence workflow and activity patterns + +## Code Structure + +### Main Components +- `main.go` - Worker/trigger entry point following Cadence patterns +- `tls_workflow.go` - Workflow and activity definitions + +### Key Functions +- `tlsWorkflow()` - Main workflow that orchestrates TLS testing +- `setupCertificatesActivity()` - Activity to create test certificates +- `testTLSConnectionActivity()` - Activity to test TLS connections +- `testStandardConnectionActivity()` - Activity to test non-TLS connections +- `cleanupCertificatesActivity()` - Activity to clean up certificates +- `createTLSClient()` - Helper to create TLS-enabled Cadence client + +This recipe demonstrates how to integrate TLS configuration testing into Cadence workflows, making it suitable for production environments where TLS verification is part of automated testing or deployment processes. \ No newline at end of file diff --git a/cmd/samples/recipes/tls/main.go b/cmd/samples/recipes/tls/main.go new file mode 100644 index 0000000..d2cd299 --- /dev/null +++ b/cmd/samples/recipes/tls/main.go @@ -0,0 +1,276 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "flag" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + "go.uber.org/zap" + "google.golang.org/grpc/credentials" + + "github.com/uber-common/cadence-samples/cmd/samples/common" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/compatibility" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" +) + +const ( + ApplicationName = "tlsTaskList" + TLSWorkflowName = "tlsWorkflow" +) + +func startWorkers(h *common.SampleHelper) { + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + } + h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) +} + +func startWorkflow(h *common.SampleHelper) { + workflowOptions := client.StartWorkflowOptions{ + ID: "tls_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: 5 * time.Minute, + DecisionTaskStartToCloseTimeout: time.Minute, + } + h.StartWorkflow(workflowOptions, TLSWorkflowName) +} + +func registerWorkflowAndActivity(h *common.SampleHelper) { + h.RegisterWorkflowWithAlias(tlsWorkflow, TLSWorkflowName) + h.RegisterActivity(setupCertificatesActivity) + h.RegisterActivity(testTLSConnectionActivity) + h.RegisterActivity(testStandardConnectionActivity) + h.RegisterActivity(cleanupCertificatesActivity) +} + +func main() { + var mode string + flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.") + flag.Parse() + + var h common.SampleHelper + h.SetupServiceConfig() + + switch mode { + case "worker": + registerWorkflowAndActivity(&h) + startWorkers(&h) + select {} + case "trigger": + startWorkflow(&h) + } +} + +// setupCertificatesForTesting handles the complete certificate setup workflow for testing +// Returns the certificates directory path on success, empty string on failure +func setupCertificatesForTesting() string { + fmt.Println("Creating fresh TLS certificates...") + certsDir := "cmd/samples/recipes/tls/certs" + + if err := createCertificates(certsDir); err != nil { + fmt.Printf("❌ Failed to create certificates: %v\n", err) + return "" + } + + fmt.Println("✅ Certificates created successfully") + fmt.Println() + return certsDir +} + +// createCertificates creates fresh TLS certificates for testing +func createCertificates(certsDir string) error { + // Ensure certs directory exists + if err := os.MkdirAll(certsDir, 0755); err != nil { + return fmt.Errorf("failed to create certs directory: %v", err) + } + + // Clean up any existing certificates first + cleanupCertificates(certsDir) + + // Generate CA private key + if err := runOpenSSLCommand("genrsa", "-out", filepath.Join(certsDir, "ca.key"), "4096"); err != nil { + return fmt.Errorf("failed to generate CA key: %v", err) + } + + // Generate CA certificate + if err := runOpenSSLCommand("req", "-new", "-x509", "-key", filepath.Join(certsDir, "ca.key"), + "-sha256", "-subj", "/C=US/ST=CA/O=Cadence/CN=CadenceCA", "-days", "3650", + "-out", filepath.Join(certsDir, "ca.crt")); err != nil { + return fmt.Errorf("failed to generate CA certificate: %v", err) + } + + // Generate server private key + if err := runOpenSSLCommand("genrsa", "-out", filepath.Join(certsDir, "server.key"), "4096"); err != nil { + return fmt.Errorf("failed to generate server key: %v", err) + } + + // Generate server certificate request + if err := runOpenSSLCommand("req", "-new", "-key", filepath.Join(certsDir, "server.key"), + "-subj", "/C=US/ST=CA/O=Cadence/CN=localhost", + "-out", filepath.Join(certsDir, "server.csr")); err != nil { + return fmt.Errorf("failed to generate server CSR: %v", err) + } + + // Sign server certificate + if err := runOpenSSLCommand("x509", "-req", "-in", filepath.Join(certsDir, "server.csr"), + "-CA", filepath.Join(certsDir, "ca.crt"), "-CAkey", filepath.Join(certsDir, "ca.key"), + "-CAcreateserial", "-out", filepath.Join(certsDir, "server.crt"), + "-days", "365", "-sha256"); err != nil { + return fmt.Errorf("failed to sign server certificate: %v", err) + } + + // Generate client private key + if err := runOpenSSLCommand("genrsa", "-out", filepath.Join(certsDir, "client.key"), "4096"); err != nil { + return fmt.Errorf("failed to generate client key: %v", err) + } + + // Generate client certificate request + if err := runOpenSSLCommand("req", "-new", "-key", filepath.Join(certsDir, "client.key"), + "-subj", "/C=US/ST=CA/O=Cadence/CN=client", + "-out", filepath.Join(certsDir, "client.csr")); err != nil { + return fmt.Errorf("failed to generate client CSR: %v", err) + } + + // Sign client certificate + if err := runOpenSSLCommand("x509", "-req", "-in", filepath.Join(certsDir, "client.csr"), + "-CA", filepath.Join(certsDir, "ca.crt"), "-CAkey", filepath.Join(certsDir, "ca.key"), + "-CAcreateserial", "-out", filepath.Join(certsDir, "client.crt"), + "-days", "365", "-sha256"); err != nil { + return fmt.Errorf("failed to sign client certificate: %v", err) + } + + // Clean up CSR files + os.Remove(filepath.Join(certsDir, "server.csr")) + os.Remove(filepath.Join(certsDir, "client.csr")) + + return nil +} + +// cleanupCertificates removes all certificate files +func cleanupCertificates(certsDir string) { + certFiles := []string{"ca.key", "ca.crt", "ca.srl", "server.key", "server.crt", "server.csr", "client.key", "client.crt", "client.csr"} + for _, file := range certFiles { + os.Remove(filepath.Join(certsDir, file)) + } +} + +// runOpenSSLCommand executes an openssl command with given arguments +func runOpenSSLCommand(args ...string) error { + cmd := exec.Command("openssl", args...) + cmd.Stdout = nil // Suppress output + cmd.Stderr = nil // Suppress errors for cleaner output + return cmd.Run() +} + +// createTLSClient creates a Cadence client with TLS configuration +func createTLSClient(logger *zap.Logger, clientCertPath, clientKeyPath, caCertPath string) (client.Client, error) { + // Load client certificate + clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath) + if err != nil { + return nil, err + } + + // Load CA certificate + caCert, err := ioutil.ReadFile(caCertPath) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Configure TLS + tlsConfig := tls.Config{ + InsecureSkipVerify: true, + RootCAs: caCertPool, + Certificates: []tls.Certificate{clientCert}, + } + creds := credentials.NewTLS(&tlsConfig) + + // Create gRPC transport with TLS + grpcTransport := grpc.NewTransport() + dialer := grpcTransport.NewDialer(grpc.DialerCredentials(creds)) + outbound := grpcTransport.NewOutbound(peer.NewSingle(hostport.Identify("localhost:7933"), dialer)) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: "cadence-client", + Outbounds: yarpc.Outbounds{ + "cadence-frontend": {Unary: outbound}, + }, + }) + + if err := dispatcher.Start(); err != nil { + return nil, err + } + + // Create service client + clientConfig := dispatcher.ClientConfig("cadence-frontend") + service := compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) + + return client.NewClient(service, "sample-domain", &client.Options{}), nil +} + +// createNonFatalLogger creates a logger that doesn't call os.Exit +func createNonFatalLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) // Only show errors + logger, _ := config.Build() + return logger +} + +// fileExists checks if a file exists +func fileExists(filename string) bool { + _, err := os.Stat(filename) + return !os.IsNotExist(err) +} + +// getSimpleError extracts a simple error message for display +func getSimpleError(err error) string { + errStr := err.Error() + + switch { + case contains(errStr, "no such file or directory"): + return "Certificate file not found" + case contains(errStr, "connection refused"): + return "Cadence server not running" + case contains(errStr, "certificate verify failed"): + return "Certificate verification failed" + case contains(errStr, "tls: handshake failure"): + return "TLS handshake failed" + case contains(errStr, "dial tcp"): + return "Cannot connect to server" + default: + return "Connection error: " + errStr + } +} + +// contains checks if a string contains a substring +func contains(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/cmd/samples/recipes/tls/tls_workflow.go b/cmd/samples/recipes/tls/tls_workflow.go new file mode 100644 index 0000000..07bf99e --- /dev/null +++ b/cmd/samples/recipes/tls/tls_workflow.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "errors" + "time" + + "github.com/uber-common/cadence-samples/cmd/samples/common" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +const tlsWorkflowName = "tlsWorkflow" + +// tlsWorkflow demonstrates TLS connection testing workflow +func tlsWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("TLS Workflow started") + + // Set activity options + activityOptions := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + // Test 1: Setup certificates + logger.Info("Setting up certificates for testing") + var certsDir string + err := workflow.ExecuteActivity(ctx, setupCertificatesActivity).Get(ctx, &certsDir) + if err != nil { + logger.Error("Certificate setup failed", zap.Error(err)) + return err + } + + // Test 2: Test TLS connection with valid certificates + logger.Info("Testing TLS connection with valid certificates") + var validResult string + err = workflow.ExecuteActivity(ctx, testTLSConnectionActivity, certsDir+"/client.crt", certsDir+"/client.key", certsDir+"/ca.crt").Get(ctx, &validResult) + if err != nil { + logger.Error("Valid TLS connection test failed", zap.Error(err)) + } else { + logger.Info("Valid TLS connection test result", zap.String("Result", validResult)) + } + + // Test 3: Test TLS connection with missing certificates + logger.Info("Testing TLS connection with missing certificates") + var invalidResult string + err = workflow.ExecuteActivity(ctx, testTLSConnectionActivity, "certs/missing.crt", "certs/missing.key", "certs/missing-ca.crt").Get(ctx, &invalidResult) + if err != nil { + logger.Info("Missing certificates test failed as expected", zap.Error(err)) + } else { + logger.Info("Missing certificates test result", zap.String("Result", invalidResult)) + } + + // Test 4: Test standard non-TLS connection + logger.Info("Testing standard non-TLS connection") + var standardResult string + err = workflow.ExecuteActivity(ctx, testStandardConnectionActivity).Get(ctx, &standardResult) + if err != nil { + logger.Error("Standard connection test failed", zap.Error(err)) + } else { + logger.Info("Standard connection test result", zap.String("Result", standardResult)) + } + + // Cleanup certificates + logger.Info("Cleaning up certificates") + var cleanupResult string + err = workflow.ExecuteActivity(ctx, cleanupCertificatesActivity, certsDir).Get(ctx, &cleanupResult) + if err != nil { + logger.Error("Certificate cleanup failed", zap.Error(err)) + } else { + logger.Info("Certificate cleanup completed", zap.String("Result", cleanupResult)) + } + + logger.Info("TLS Workflow completed successfully") + return nil +} + +// setupCertificatesActivity creates certificates for testing +func setupCertificatesActivity(ctx context.Context) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Setting up certificates for testing") + + certsDir := setupCertificatesForTesting() + if certsDir == "" { + return "", errors.New("failed to setup certificates") + } + + logger.Info("Certificates setup completed", zap.String("CertsDir", certsDir)) + return certsDir, nil +} + +// testTLSConnectionActivity tests TLS connection with given certificates +func testTLSConnectionActivity(ctx context.Context, clientCert, clientKey, caCert string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Testing TLS connection", + zap.String("ClientCert", clientCert), + zap.String("ClientKey", clientKey), + zap.String("CACert", caCert)) + + // Pre-check certificate files exist + if !fileExists(clientCert) || !fileExists(clientKey) || !fileExists(caCert) { + return "FAILED - Certificate file not found", errors.New("certificate file not found") + } + + nonFatalLogger := createNonFatalLogger() + + // Create TLS-enabled client manually + client, err := createTLSClient(nonFatalLogger, clientCert, clientKey, caCert) + if err != nil { + errorMsg := "FAILED - " + getSimpleError(err) + logger.Info("TLS connection failed", zap.String("Error", errorMsg)) + return errorMsg, err + } + _ = client + + result := "SUCCESS - TLS connection established" + logger.Info("TLS connection succeeded") + return result, nil +} + +// testStandardConnectionActivity tests standard non-TLS connection +func testStandardConnectionActivity(ctx context.Context) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Testing standard connection") + + nonFatalLogger := createNonFatalLogger() + + builder := common.NewBuilder(nonFatalLogger). + SetHostPort("localhost:7933"). + SetDomain("sample-domain") + // No SetTLSConfig call - TLS is disabled + + _, err := builder.BuildCadenceClient() + if err != nil { + errorMsg := "FAILED - " + getSimpleError(err) + logger.Info("Standard connection failed", zap.String("Error", errorMsg)) + return errorMsg, err + } + + result := "SUCCESS - Standard connection established" + logger.Info("Standard connection succeeded") + return result, nil +} + +// cleanupCertificatesActivity removes certificate files +func cleanupCertificatesActivity(ctx context.Context, certsDir string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Cleaning up certificates", zap.String("CertsDir", certsDir)) + + cleanupCertificates(certsDir) + + result := "Certificates cleaned up successfully" + logger.Info("Certificate cleanup completed") + return result, nil +}