diff --git a/Makefile b/Makefile index 469f3fb6..a8b8aa1b 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ PROGS = helloworld \ signalcounter \ sideeffect \ sleep \ + dataconverter \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -70,6 +71,7 @@ TEST_DIRS=./cmd/samples/cron \ ./cmd/samples/recipes/sideeffect \ ./cmd/samples/recipes/signalcounter \ ./cmd/samples/recipes/sleep \ + ./cmd/samples/recipes/dataconverter \ ./cmd/samples/recovery \ ./cmd/samples/pso \ @@ -181,6 +183,9 @@ sideeffect: versioning: go build -o bin/versioning cmd/samples/recipes/versioning/*.go +dataconverter: + go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go + bins: helloworld \ versioning \ delaystart \ @@ -213,6 +218,7 @@ bins: helloworld \ signalcounter \ sideeffect \ sleep \ + dataconverter \ test: bins @rm -f test diff --git a/cmd/samples/recipes/dataconverter/README.md b/cmd/samples/recipes/dataconverter/README.md new file mode 100644 index 00000000..599b6888 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/README.md @@ -0,0 +1,74 @@ +# Data Converter Sample + +This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth. + +## Sample Description + +The sample implements a custom compressed JSON data converter that: +- Serializes workflow inputs and activity parameters to JSON format +- Compresses the JSON data using gzip compression to reduce size +- Decompresses and deserializes workflow outputs and activity results from JSON format +- Provides significant storage and bandwidth savings for large payloads +- Demonstrates advanced data converter patterns for production use cases +- Shows real-time compression statistics and size comparisons + +The sample includes two workflows: +1. **Simple Workflow**: Processes a basic `MyPayload` struct +2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits + +All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime. + +## Key Components + +- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression +- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression +- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression +- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types +- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration +- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics +- **Tests**: Includes unit tests for both simple and large payload workflows +- **Compression**: Automatic gzip compression/decompression for all workflow data + +## Steps to Run Sample + +1. You need a cadence service running. See details in cmd/samples/README.md + +2. Run the following command to start the worker: + ``` + ./bin/dataconverter -m worker + ``` + +3. Run the following command to execute the workflow: + ``` + ./bin/dataconverter -m trigger + ``` + +You should see: +- Compression statistics showing original vs compressed data sizes +- Workflow logs showing the processing of large payloads +- Activity execution logs with payload information +- Final workflow completion with compression benefits noted + +## Customization + +To implement your own data converter with compression or other features: +1. Create a struct that implements the `encoded.DataConverter` interface +2. Implement the `ToData` method for serialization and compression +3. Implement the `FromData` method for decompression and deserialization +4. Register the converter in the worker options + +This pattern is useful when you need to: +- Reduce storage costs and bandwidth usage with compression +- Use specific serialization formats for performance or compatibility +- Add encryption/decryption to workflow data +- Implement custom compression algorithms (LZ4, Snappy, etc.) +- Support legacy data formats +- Add data validation or transformation during serialization + +## Performance Benefits + +The compressed data converter provides: +- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads +- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission +- **Cost Optimization**: Reduced storage costs in Cadence history +- **Scalability**: Better performance with large payloads \ No newline at end of file diff --git a/cmd/samples/recipes/dataconverter/dataconverter.go b/cmd/samples/recipes/dataconverter/dataconverter.go new file mode 100644 index 00000000..326eef34 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/dataconverter.go @@ -0,0 +1,71 @@ +package main + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "reflect" + + "go.uber.org/cadence/encoded" +) + +type compressedJSONDataConverter struct{} + +func NewCompressedJSONDataConverter() encoded.DataConverter { + return &compressedJSONDataConverter{} +} + +func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) { + // First, serialize to JSON + var jsonBuf bytes.Buffer + enc := json.NewEncoder(&jsonBuf) + for i, obj := range value { + err := enc.Encode(obj) + if err != nil { + return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + + // Then compress the JSON data + var compressedBuf bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBuf) + + _, err := gzipWriter.Write(jsonBuf.Bytes()) + if err != nil { + return nil, fmt.Errorf("unable to compress data: %v", err) + } + + err = gzipWriter.Close() + if err != nil { + return nil, fmt.Errorf("unable to close gzip writer: %v", err) + } + + return compressedBuf.Bytes(), nil +} + +func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error { + // First, decompress the data + gzipReader, err := gzip.NewReader(bytes.NewBuffer(input)) + if err != nil { + return fmt.Errorf("unable to create gzip reader: %v", err) + } + defer gzipReader.Close() + + // Read the decompressed JSON data + decompressedData, err := io.ReadAll(gzipReader) + if err != nil { + return fmt.Errorf("unable to decompress data: %v", err) + } + + // Then deserialize from JSON + dec := json.NewDecoder(bytes.NewBuffer(decompressedData)) + for i, obj := range valuePtr { + err := dec.Decode(obj) + if err != nil { + return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + return nil +} diff --git a/cmd/samples/recipes/dataconverter/large_payload.go b/cmd/samples/recipes/dataconverter/large_payload.go new file mode 100644 index 00000000..fce629a9 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/large_payload.go @@ -0,0 +1,223 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "go.uber.org/cadence/encoded" +) + +// LargePayload represents a complex data structure with nested objects and arrays +type LargePayload struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Metadata map[string]interface{} `json:"metadata"` + Items []Item `json:"items"` + Config Config `json:"config"` + History []HistoryEntry `json:"history"` + Tags []string `json:"tags"` + Stats Statistics `json:"statistics"` +} + +// Item represents a single item in the payload +type Item struct { + ItemID string `json:"item_id"` + Title string `json:"title"` + Description string `json:"description"` + Price float64 `json:"price"` + Categories []string `json:"categories"` + Attributes map[string]string `json:"attributes"` + Reviews []Review `json:"reviews"` + Inventory Inventory `json:"inventory"` +} + +// Review represents a product review +type Review struct { + ReviewID string `json:"review_id"` + UserID string `json:"user_id"` + Rating int `json:"rating"` + Comment string `json:"comment"` + Helpful int `json:"helpful_votes"` + NotHelpful int `json:"not_helpful_votes"` + Date string `json:"date"` + Verified bool `json:"verified_purchase"` + Score float64 `json:"score"` +} + +// Inventory represents inventory information +type Inventory struct { + Quantity int `json:"quantity"` + Location string `json:"location"` + LastUpdated string `json:"last_updated"` + Status string `json:"status"` +} + +// Config represents configuration settings +type Config struct { + Version string `json:"version"` + Environment string `json:"environment"` + Settings map[string]string `json:"settings"` + Features []string `json:"features"` + Limits Limits `json:"limits"` +} + +// Limits represents system limits +type Limits struct { + MaxItems int `json:"max_items"` + MaxRequests int `json:"max_requests_per_minute"` + MaxFileSize int `json:"max_file_size_mb"` + MaxUsers int `json:"max_concurrent_users"` + TimeoutSecs int `json:"timeout_seconds"` +} + +// HistoryEntry represents a historical event +type HistoryEntry struct { + EventID string `json:"event_id"` + Timestamp string `json:"timestamp"` + EventType string `json:"event_type"` + UserID string `json:"user_id"` + Details map[string]string `json:"details"` + Severity string `json:"severity"` +} + +// Statistics represents statistical data +type Statistics struct { + TotalItems int `json:"total_items"` + TotalUsers int `json:"total_users"` + AverageRating float64 `json:"average_rating"` + TotalRevenue float64 `json:"total_revenue"` + ActiveOrders int `json:"active_orders"` + CompletionRate float64 `json:"completion_rate"` +} + +// CreateLargePayload creates a sample large payload with realistic data +func CreateLargePayload() LargePayload { + // Create a large description with repeated text to demonstrate compression + largeDescription := strings.Repeat("This is a comprehensive product catalog containing thousands of items with detailed descriptions, specifications, and user reviews. Each item includes pricing information, inventory status, and customer feedback. The catalog is designed to provide complete information for customers making purchasing decisions. ", 50) + + // Create sample items + items := make([]Item, 100) + for i := 0; i < 100; i++ { + reviews := make([]Review, 25) + for j := 0; j < 25; j++ { + reviews[j] = Review{ + ReviewID: fmt.Sprintf("review_%d_%d", i, j), + UserID: fmt.Sprintf("user_%d", j), + Rating: 1 + (j % 5), + Comment: strings.Repeat("This is a detailed customer review with comprehensive feedback about the product quality, delivery experience, and overall satisfaction. The customer provides specific details about their experience. ", 3), + Helpful: j * 2, + NotHelpful: j, + Date: "2024-01-15T10:30:00Z", + Verified: j%2 == 0, + Score: float64(1+(j%5)) + float64(j%10)/10.0, + } + } + + attributes := make(map[string]string) + for k := 0; k < 20; k++ { + attributes[fmt.Sprintf("attr_%d", k)] = strings.Repeat("This is a detailed attribute description with comprehensive information about the product specification. ", 2) + } + + items[i] = Item{ + ItemID: fmt.Sprintf("item_%d", i), + Title: fmt.Sprintf("High-Quality Product %d with Advanced Features", i), + Description: strings.Repeat("This is a premium product with exceptional quality and advanced features designed for professional use. It includes comprehensive documentation and support. ", 10), + Price: float64(100+i*10) + float64(i%100)/100.0, + Categories: []string{"Electronics", "Professional", "Premium", "Advanced"}, + Attributes: attributes, + Reviews: reviews, + Inventory: Inventory{ + Quantity: 100 + i, + Location: fmt.Sprintf("Warehouse %d", i%5), + LastUpdated: "2024-01-15T10:30:00Z", + Status: "In Stock", + }, + } + } + + // Create history entries + history := make([]HistoryEntry, 50) + for i := 0; i < 50; i++ { + details := make(map[string]string) + for j := 0; j < 10; j++ { + details[fmt.Sprintf("detail_%d", j)] = strings.Repeat("This is a detailed event description with comprehensive information about the system event and its impact. ", 2) + } + + history[i] = HistoryEntry{ + EventID: fmt.Sprintf("event_%d", i), + Timestamp: "2024-01-15T10:30:00Z", + EventType: "system_update", + UserID: fmt.Sprintf("admin_%d", i%5), + Details: details, + Severity: "medium", + } + } + + // Create metadata + metadata := make(map[string]interface{}) + for i := 0; i < 30; i++ { + metadata[fmt.Sprintf("meta_key_%d", i)] = strings.Repeat("This is comprehensive metadata information with detailed descriptions and specifications. ", 5) + } + + return LargePayload{ + ID: "large_payload_001", + Name: "Comprehensive Product Catalog", + Description: largeDescription, + Metadata: metadata, + Items: items, + Config: Config{ + Version: "2.1.0", + Environment: "production", + Settings: map[string]string{ + "cache_enabled": "true", + "compression_level": "high", + "timeout": "30s", + "max_connections": "1000", + "retry_attempts": "3", + }, + Features: []string{"advanced_search", "real_time_updates", "analytics", "reporting", "integration"}, + Limits: Limits{ + MaxItems: 10000, + MaxRequests: 1000, + MaxFileSize: 100, + MaxUsers: 5000, + TimeoutSecs: 30, + }, + }, + History: history, + Tags: []string{"catalog", "products", "inventory", "analytics", "reporting", "integration", "api", "dashboard"}, + Stats: Statistics{ + TotalItems: 10000, + TotalUsers: 5000, + AverageRating: 4.2, + TotalRevenue: 1250000.50, + ActiveOrders: 250, + CompletionRate: 98.5, + }, + } +} + +// GetPayloadSizeInfo returns information about the payload size before and after compression +func GetPayloadSizeInfo(payload LargePayload, converter encoded.DataConverter) (int, int, float64, error) { + // Serialize to JSON to get original size + jsonData, err := json.Marshal(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to marshal payload: %v", err) + } + originalSize := len(jsonData) + + // Compress using our converter + compressedData, err := converter.ToData(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to compress payload: %v", err) + } + compressedSize := len(compressedData) + + // Calculate compression ratio + compressionRatio := float64(compressedSize) / float64(originalSize) + compressionPercentage := (1.0 - compressionRatio) * 100 + + return originalSize, compressedSize, compressionPercentage, nil +} diff --git a/cmd/samples/recipes/dataconverter/main.go b/cmd/samples/recipes/dataconverter/main.go new file mode 100644 index 00000000..fa2af203 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "flag" + "fmt" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +const ( + ApplicationName = "dataConverterTaskList" +) + +func startWorkers(h *common.SampleHelper) { + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + DataConverter: NewCompressedJSONDataConverter(), + } + h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) +} + +func startWorkflow(h *common.SampleHelper) { + // Create a large payload to demonstrate compression benefits + largeInput := CreateLargePayload() + + // Show compression statistics before starting workflow + converter := NewCompressedJSONDataConverter() + originalSize, compressedSize, compressionPercentage, err := GetPayloadSizeInfo(largeInput, converter) + if err != nil { + fmt.Printf("Error calculating compression stats: %v\n", err) + } else { + fmt.Printf("=== Compression Statistics ===\n") + fmt.Printf("Original JSON size: %d bytes (%.2f KB)\n", originalSize, float64(originalSize)/1024.0) + fmt.Printf("Compressed size: %d bytes (%.2f KB)\n", compressedSize, float64(compressedSize)/1024.0) + fmt.Printf("Compression ratio: %.2f%% reduction\n", compressionPercentage) + fmt.Printf("Space saved: %d bytes (%.2f KB)\n", originalSize-compressedSize, float64(originalSize-compressedSize)/1024.0) + fmt.Printf("=============================\n\n") + } + + workflowOptions := client.StartWorkflowOptions{ + ID: "dataconverter_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute, + DecisionTaskStartToCloseTimeout: time.Minute, + } + h.StartWorkflow(workflowOptions, LargeDataConverterWorkflowName, largeInput) +} + +func registerWorkflowAndActivity(h *common.SampleHelper) { + h.RegisterWorkflowWithAlias(largeDataConverterWorkflow, LargeDataConverterWorkflowName) + h.RegisterActivity(largeDataConverterActivity) +} + +func main() { + var mode string + flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.") + flag.Parse() + + var h common.SampleHelper + h.DataConverter = NewCompressedJSONDataConverter() + h.SetupServiceConfig() + + switch mode { + case "worker": + registerWorkflowAndActivity(&h) + startWorkers(&h) + select {} + case "trigger": + startWorkflow(&h) + } +} diff --git a/cmd/samples/recipes/dataconverter/workflow.go b/cmd/samples/recipes/dataconverter/workflow.go new file mode 100644 index 00000000..2edc1a51 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +type MyPayload struct { + Msg string + Count int +} + +// LargeDataConverterWorkflowName is the workflow name for large payload processing +const LargeDataConverterWorkflowName = "largeDataConverterWorkflow" + +// largeDataConverterWorkflow demonstrates processing large payloads with compression +func largeDataConverterWorkflow(ctx workflow.Context, input LargePayload) (LargePayload, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Large payload workflow started", zap.String("payload_id", input.ID)) + logger.Info("Processing large payload with compression", zap.Int("items_count", len(input.Items))) + + activityOptions := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result LargePayload + err := workflow.ExecuteActivity(ctx, largeDataConverterActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Large payload activity failed", zap.Error(err)) + return LargePayload{}, err + } + + logger.Info("Large payload workflow completed", zap.String("result_id", result.ID)) + logger.Info("Note: All large payload data was automatically compressed/decompressed using gzip compression") + return result, nil +} + +func largeDataConverterActivity(ctx context.Context, input LargePayload) (LargePayload, error) { + logger := activity.GetLogger(ctx) + logger.Info("Large payload activity received input", zap.String("payload_id", input.ID), zap.Int("items_count", len(input.Items))) + + // Process the large payload (in a real scenario, this might involve data transformation, validation, etc.) + input.Name = input.Name + " (Processed)" + input.Stats.TotalItems = len(input.Items) + + logger.Info("Large payload activity completed", zap.String("result_id", input.ID)) + return input, nil +} diff --git a/cmd/samples/recipes/dataconverter/workflow_test.go b/cmd/samples/recipes/dataconverter/workflow_test.go new file mode 100644 index 00000000..4313cff5 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/encoded" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/worker" +) + +func Test_LargeDataConverterWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(largeDataConverterWorkflow) + env.RegisterActivity(largeDataConverterActivity) + + dataConverter := NewCompressedJSONDataConverter() + workerOptions := worker.Options{ + DataConverter: dataConverter, + } + env.SetWorkerOptions(workerOptions) + + input := CreateLargePayload() + + var activityResult LargePayload + env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) { + result.Get(&activityResult) + }) + + env.ExecuteWorkflow(largeDataConverterWorkflow, input) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, "Comprehensive Product Catalog (Processed)", activityResult.Name) + require.Equal(t, 100, activityResult.Stats.TotalItems) +}