From ca038c6edfe2b2990e84ec9cf2f020e4fa24f746 Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Tue, 29 Jul 2025 11:25:07 -0700 Subject: [PATCH 1/7] Workflow for DataConverter --- Makefile | 6 ++ .../recipes/dataconverter/dataconverter.go | 39 ++++++++++++ cmd/samples/recipes/dataconverter/main.go | 63 +++++++++++++++++++ cmd/samples/recipes/dataconverter/workflow.go | 46 ++++++++++++++ .../recipes/dataconverter/workflow_test.go | 38 +++++++++++ 5 files changed, 192 insertions(+) create mode 100644 cmd/samples/recipes/dataconverter/dataconverter.go create mode 100644 cmd/samples/recipes/dataconverter/main.go create mode 100644 cmd/samples/recipes/dataconverter/workflow.go create mode 100644 cmd/samples/recipes/dataconverter/workflow_test.go diff --git a/Makefile b/Makefile index 81da9515..d5e17f12 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,7 @@ PROGS = helloworld \ pageflow \ signalcounter \ sideeffect \ + dataconverter \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -68,6 +69,7 @@ TEST_DIRS=./cmd/samples/cron \ ./cmd/samples/recipes/searchattributes \ ./cmd/samples/recipes/sideeffect \ ./cmd/samples/recipes/signalcounter \ + ./cmd/samples/recipes/dataconverter \ ./cmd/samples/recovery \ ./cmd/samples/pso \ @@ -176,6 +178,9 @@ sideeffect: versioning: go build -o bin/versioning cmd/samples/recipes/versioning/*.go +dataconverter: + go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go + bins: helloworld \ versioning \ delaystart \ @@ -207,6 +212,7 @@ bins: helloworld \ pageflow \ signalcounter \ sideeffect \ + dataconverter \ test: bins @rm -f test diff --git a/cmd/samples/recipes/dataconverter/dataconverter.go b/cmd/samples/recipes/dataconverter/dataconverter.go new file mode 100644 index 00000000..217ea14d --- /dev/null +++ b/cmd/samples/recipes/dataconverter/dataconverter.go @@ -0,0 +1,39 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" + + "go.uber.org/cadence/encoded" +) + +type jsonDataConverter struct{} + +func NewJSONDataConverter() encoded.DataConverter { + return &jsonDataConverter{} +} + +func (dc *jsonDataConverter) ToData(value ...interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for i, obj := range value { + err := enc.Encode(obj) + if err != nil { + return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + return buf.Bytes(), nil +} + +func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) error { + dec := json.NewDecoder(bytes.NewBuffer(input)) + for i, obj := range valuePtr { + err := dec.Decode(obj) + if err != nil { + return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) + } + } + return nil +} diff --git a/cmd/samples/recipes/dataconverter/main.go b/cmd/samples/recipes/dataconverter/main.go new file mode 100644 index 00000000..3870db94 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "flag" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +const ( + ApplicationName = "dataConverterTaskList" +) + +func startWorkers(h *common.SampleHelper) { + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + DataConverter: NewJSONDataConverter(), + } + h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) +} + +func startWorkflow(h *common.SampleHelper) { + input := MyPayload{Msg: "hello", Count: 1} + workflowOptions := client.StartWorkflowOptions{ + ID: "dataconverter_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute, + DecisionTaskStartToCloseTimeout: time.Minute, + } + h.StartWorkflow(workflowOptions, DataConverterWorkflowName, input) +} + +func registerWorkflowAndActivity(h *common.SampleHelper) { + h.RegisterWorkflowWithAlias(dataConverterWorkflow, DataConverterWorkflowName) + h.RegisterActivity(dataConverterActivity) +} + +func main() { + var mode string + flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.") + flag.Parse() + + var h common.SampleHelper + h.DataConverter = NewJSONDataConverter() + h.SetupServiceConfig() + + switch mode { + case "worker": + registerWorkflowAndActivity(&h) + startWorkers(&h) + select {} + case "trigger": + startWorkflow(&h) + } +} diff --git a/cmd/samples/recipes/dataconverter/workflow.go b/cmd/samples/recipes/dataconverter/workflow.go new file mode 100644 index 00000000..2ea49300 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +type MyPayload struct { + Msg string + Count int +} + +const DataConverterWorkflowName = "dataConverterWorkflow" + +func dataConverterWorkflow(ctx workflow.Context, input MyPayload) (MyPayload, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Workflow started", zap.Any("input", input)) + + activityOptions := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result MyPayload + err := workflow.ExecuteActivity(ctx, dataConverterActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed", zap.Error(err)) + return MyPayload{}, err + } + logger.Info("Workflow completed", zap.Any("result", result)) + return result, nil +} + +func dataConverterActivity(ctx context.Context, input MyPayload) (MyPayload, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity received input", zap.Any("input", input)) + input.Msg = input.Msg + " processed" + input.Count++ + logger.Info("Activity returning", zap.Any("output", input)) + return input, nil +} diff --git a/cmd/samples/recipes/dataconverter/workflow_test.go b/cmd/samples/recipes/dataconverter/workflow_test.go new file mode 100644 index 00000000..c0a3f76d --- /dev/null +++ b/cmd/samples/recipes/dataconverter/workflow_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/encoded" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/worker" +) + +func Test_DataConverterWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(dataConverterWorkflow) + env.RegisterActivity(dataConverterActivity) + + dataConverter := NewJSONDataConverter() + workerOptions := worker.Options{ + DataConverter: dataConverter, + } + env.SetWorkerOptions(workerOptions) + + input := MyPayload{Msg: "test", Count: 42} + + var activityResult MyPayload + env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) { + result.Get(&activityResult) + }) + + env.ExecuteWorkflow(dataConverterWorkflow, input) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, "test processed", activityResult.Msg) + require.Equal(t, 43, activityResult.Count) +} From b2fe6c0575f6d07845067130816e50b94707827a Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Tue, 29 Jul 2025 11:29:29 -0700 Subject: [PATCH 2/7] Workflow for DataConverter --- cmd/samples/recipes/dataconverter/README.md | 50 +++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 cmd/samples/recipes/dataconverter/README.md diff --git a/cmd/samples/recipes/dataconverter/README.md b/cmd/samples/recipes/dataconverter/README.md new file mode 100644 index 00000000..a67f5917 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/README.md @@ -0,0 +1,50 @@ +# Data Converter Sample + +This sample workflow demonstrates how to use custom data converters in Cadence workflows. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters. + +## Sample Description + +The sample implements a custom JSON data converter that: +- Serializes workflow inputs and activity parameters to JSON format +- Deserializes workflow outputs and activity results from JSON format +- Provides better control over data serialization compared to the default data converter +- Can be extended to support custom serialization formats (e.g., Protocol Buffers, MessagePack) + +The workflow takes a `MyPayload` struct as input, processes it through an activity, and returns the modified payload. + +## Key Components + +- **Custom Data Converter**: `jsonDataConverter` implements the `encoded.DataConverter` interface +- **Workflow**: `dataConverterWorkflow` demonstrates using custom data types with the converter +- **Activity**: `dataConverterActivity` processes the input and returns modified data +- **Test**: Includes unit tests to verify the data converter functionality + +## Steps to Run Sample + +1. You need a cadence service running. See details in cmd/samples/README.md + +2. Run the following command to start the worker: + ``` + ./bin/dataconverter -m worker + ``` + +3. Run the following command to execute the workflow: + ``` + ./bin/dataconverter -m trigger + ``` + +You should see logs showing the workflow input being processed through the activity and the final result being returned. + +## Customization + +To use a different serialization format, you can implement your own data converter by: +1. Creating a struct that implements the `encoded.DataConverter` interface +2. Implementing the `ToData` method for serialization +3. Implementing the `FromData` method for deserialization +4. Registering the converter in the worker options + +This pattern is useful when you need to: +- Use specific serialization formats for performance or compatibility +- Add encryption/decryption to workflow data +- Implement custom compression for large payloads +- Support legacy data formats \ No newline at end of file From 7015866cd56f45e89810b47dccadb7ab65b9a30f Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Tue, 29 Jul 2025 19:04:01 -0700 Subject: [PATCH 3/7] data converters with compression --- cmd/samples/recipes/dataconverter/README.md | 60 +++-- .../recipes/dataconverter/dataconverter.go | 62 ++++- .../recipes/dataconverter/large_payload.go | 223 ++++++++++++++++++ cmd/samples/recipes/dataconverter/main.go | 27 ++- cmd/samples/recipes/dataconverter/workflow.go | 41 ++++ .../recipes/dataconverter/workflow_test.go | 29 ++- 6 files changed, 410 insertions(+), 32 deletions(-) create mode 100644 cmd/samples/recipes/dataconverter/large_payload.go diff --git a/cmd/samples/recipes/dataconverter/README.md b/cmd/samples/recipes/dataconverter/README.md index a67f5917..599b6888 100644 --- a/cmd/samples/recipes/dataconverter/README.md +++ b/cmd/samples/recipes/dataconverter/README.md @@ -1,23 +1,33 @@ # Data Converter Sample -This sample workflow demonstrates how to use custom data converters in Cadence workflows. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters. +This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth. ## Sample Description -The sample implements a custom JSON data converter that: +The sample implements a custom compressed JSON data converter that: - Serializes workflow inputs and activity parameters to JSON format -- Deserializes workflow outputs and activity results from JSON format -- Provides better control over data serialization compared to the default data converter -- Can be extended to support custom serialization formats (e.g., Protocol Buffers, MessagePack) +- Compresses the JSON data using gzip compression to reduce size +- Decompresses and deserializes workflow outputs and activity results from JSON format +- Provides significant storage and bandwidth savings for large payloads +- Demonstrates advanced data converter patterns for production use cases +- Shows real-time compression statistics and size comparisons -The workflow takes a `MyPayload` struct as input, processes it through an activity, and returns the modified payload. +The sample includes two workflows: +1. **Simple Workflow**: Processes a basic `MyPayload` struct +2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits + +All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime. ## Key Components -- **Custom Data Converter**: `jsonDataConverter` implements the `encoded.DataConverter` interface -- **Workflow**: `dataConverterWorkflow` demonstrates using custom data types with the converter -- **Activity**: `dataConverterActivity` processes the input and returns modified data -- **Test**: Includes unit tests to verify the data converter functionality +- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression +- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression +- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression +- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types +- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration +- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics +- **Tests**: Includes unit tests for both simple and large payload workflows +- **Compression**: Automatic gzip compression/decompression for all workflow data ## Steps to Run Sample @@ -33,18 +43,32 @@ The workflow takes a `MyPayload` struct as input, processes it through an activi ./bin/dataconverter -m trigger ``` -You should see logs showing the workflow input being processed through the activity and the final result being returned. +You should see: +- Compression statistics showing original vs compressed data sizes +- Workflow logs showing the processing of large payloads +- Activity execution logs with payload information +- Final workflow completion with compression benefits noted ## Customization -To use a different serialization format, you can implement your own data converter by: -1. Creating a struct that implements the `encoded.DataConverter` interface -2. Implementing the `ToData` method for serialization -3. Implementing the `FromData` method for deserialization -4. Registering the converter in the worker options +To implement your own data converter with compression or other features: +1. Create a struct that implements the `encoded.DataConverter` interface +2. Implement the `ToData` method for serialization and compression +3. Implement the `FromData` method for decompression and deserialization +4. Register the converter in the worker options This pattern is useful when you need to: +- Reduce storage costs and bandwidth usage with compression - Use specific serialization formats for performance or compatibility - Add encryption/decryption to workflow data -- Implement custom compression for large payloads -- Support legacy data formats \ No newline at end of file +- Implement custom compression algorithms (LZ4, Snappy, etc.) +- Support legacy data formats +- Add data validation or transformation during serialization + +## Performance Benefits + +The compressed data converter provides: +- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads +- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission +- **Cost Optimization**: Reduced storage costs in Cadence history +- **Scalability**: Better performance with large payloads \ No newline at end of file diff --git a/cmd/samples/recipes/dataconverter/dataconverter.go b/cmd/samples/recipes/dataconverter/dataconverter.go index 217ea14d..5b186f3c 100644 --- a/cmd/samples/recipes/dataconverter/dataconverter.go +++ b/cmd/samples/recipes/dataconverter/dataconverter.go @@ -2,33 +2,65 @@ package main import ( "bytes" + "compress/gzip" "encoding/json" "fmt" + "io" "reflect" "go.uber.org/cadence/encoded" ) -type jsonDataConverter struct{} +type compressedJSONDataConverter struct{} -func NewJSONDataConverter() encoded.DataConverter { - return &jsonDataConverter{} +func NewCompressedJSONDataConverter() encoded.DataConverter { + return &compressedJSONDataConverter{} } -func (dc *jsonDataConverter) ToData(value ...interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := json.NewEncoder(&buf) +func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) { + // First, serialize to JSON + var jsonBuf bytes.Buffer + enc := json.NewEncoder(&jsonBuf) for i, obj := range value { err := enc.Encode(obj) if err != nil { return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) } } - return buf.Bytes(), nil + + // Then compress the JSON data + var compressedBuf bytes.Buffer + gzipWriter := gzip.NewWriter(&compressedBuf) + + _, err := gzipWriter.Write(jsonBuf.Bytes()) + if err != nil { + return nil, fmt.Errorf("unable to compress data: %v", err) + } + + err = gzipWriter.Close() + if err != nil { + return nil, fmt.Errorf("unable to close gzip writer: %v", err) + } + + return compressedBuf.Bytes(), nil } -func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) error { - dec := json.NewDecoder(bytes.NewBuffer(input)) +func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error { + // First, decompress the data + gzipReader, err := gzip.NewReader(bytes.NewBuffer(input)) + if err != nil { + return fmt.Errorf("unable to create gzip reader: %v", err) + } + defer gzipReader.Close() + + // Read the decompressed JSON data + decompressedData, err := io.ReadAll(gzipReader) + if err != nil { + return fmt.Errorf("unable to decompress data: %v", err) + } + + // Then deserialize from JSON + dec := json.NewDecoder(bytes.NewBuffer(decompressedData)) for i, obj := range valuePtr { err := dec.Decode(obj) if err != nil { @@ -37,3 +69,15 @@ func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) err } return nil } + +// GetCompressionRatio returns the compression ratio for demonstration purposes +func (dc *compressedJSONDataConverter) GetCompressionRatio(originalData []byte) (float64, error) { + // Simulate the compression process to calculate ratio + compressedData, err := dc.ToData(string(originalData)) + if err != nil { + return 0, err + } + + ratio := float64(len(compressedData)) / float64(len(originalData)) + return ratio, nil +} diff --git a/cmd/samples/recipes/dataconverter/large_payload.go b/cmd/samples/recipes/dataconverter/large_payload.go new file mode 100644 index 00000000..fce629a9 --- /dev/null +++ b/cmd/samples/recipes/dataconverter/large_payload.go @@ -0,0 +1,223 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + + "go.uber.org/cadence/encoded" +) + +// LargePayload represents a complex data structure with nested objects and arrays +type LargePayload struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Metadata map[string]interface{} `json:"metadata"` + Items []Item `json:"items"` + Config Config `json:"config"` + History []HistoryEntry `json:"history"` + Tags []string `json:"tags"` + Stats Statistics `json:"statistics"` +} + +// Item represents a single item in the payload +type Item struct { + ItemID string `json:"item_id"` + Title string `json:"title"` + Description string `json:"description"` + Price float64 `json:"price"` + Categories []string `json:"categories"` + Attributes map[string]string `json:"attributes"` + Reviews []Review `json:"reviews"` + Inventory Inventory `json:"inventory"` +} + +// Review represents a product review +type Review struct { + ReviewID string `json:"review_id"` + UserID string `json:"user_id"` + Rating int `json:"rating"` + Comment string `json:"comment"` + Helpful int `json:"helpful_votes"` + NotHelpful int `json:"not_helpful_votes"` + Date string `json:"date"` + Verified bool `json:"verified_purchase"` + Score float64 `json:"score"` +} + +// Inventory represents inventory information +type Inventory struct { + Quantity int `json:"quantity"` + Location string `json:"location"` + LastUpdated string `json:"last_updated"` + Status string `json:"status"` +} + +// Config represents configuration settings +type Config struct { + Version string `json:"version"` + Environment string `json:"environment"` + Settings map[string]string `json:"settings"` + Features []string `json:"features"` + Limits Limits `json:"limits"` +} + +// Limits represents system limits +type Limits struct { + MaxItems int `json:"max_items"` + MaxRequests int `json:"max_requests_per_minute"` + MaxFileSize int `json:"max_file_size_mb"` + MaxUsers int `json:"max_concurrent_users"` + TimeoutSecs int `json:"timeout_seconds"` +} + +// HistoryEntry represents a historical event +type HistoryEntry struct { + EventID string `json:"event_id"` + Timestamp string `json:"timestamp"` + EventType string `json:"event_type"` + UserID string `json:"user_id"` + Details map[string]string `json:"details"` + Severity string `json:"severity"` +} + +// Statistics represents statistical data +type Statistics struct { + TotalItems int `json:"total_items"` + TotalUsers int `json:"total_users"` + AverageRating float64 `json:"average_rating"` + TotalRevenue float64 `json:"total_revenue"` + ActiveOrders int `json:"active_orders"` + CompletionRate float64 `json:"completion_rate"` +} + +// CreateLargePayload creates a sample large payload with realistic data +func CreateLargePayload() LargePayload { + // Create a large description with repeated text to demonstrate compression + largeDescription := strings.Repeat("This is a comprehensive product catalog containing thousands of items with detailed descriptions, specifications, and user reviews. Each item includes pricing information, inventory status, and customer feedback. The catalog is designed to provide complete information for customers making purchasing decisions. ", 50) + + // Create sample items + items := make([]Item, 100) + for i := 0; i < 100; i++ { + reviews := make([]Review, 25) + for j := 0; j < 25; j++ { + reviews[j] = Review{ + ReviewID: fmt.Sprintf("review_%d_%d", i, j), + UserID: fmt.Sprintf("user_%d", j), + Rating: 1 + (j % 5), + Comment: strings.Repeat("This is a detailed customer review with comprehensive feedback about the product quality, delivery experience, and overall satisfaction. The customer provides specific details about their experience. ", 3), + Helpful: j * 2, + NotHelpful: j, + Date: "2024-01-15T10:30:00Z", + Verified: j%2 == 0, + Score: float64(1+(j%5)) + float64(j%10)/10.0, + } + } + + attributes := make(map[string]string) + for k := 0; k < 20; k++ { + attributes[fmt.Sprintf("attr_%d", k)] = strings.Repeat("This is a detailed attribute description with comprehensive information about the product specification. ", 2) + } + + items[i] = Item{ + ItemID: fmt.Sprintf("item_%d", i), + Title: fmt.Sprintf("High-Quality Product %d with Advanced Features", i), + Description: strings.Repeat("This is a premium product with exceptional quality and advanced features designed for professional use. It includes comprehensive documentation and support. ", 10), + Price: float64(100+i*10) + float64(i%100)/100.0, + Categories: []string{"Electronics", "Professional", "Premium", "Advanced"}, + Attributes: attributes, + Reviews: reviews, + Inventory: Inventory{ + Quantity: 100 + i, + Location: fmt.Sprintf("Warehouse %d", i%5), + LastUpdated: "2024-01-15T10:30:00Z", + Status: "In Stock", + }, + } + } + + // Create history entries + history := make([]HistoryEntry, 50) + for i := 0; i < 50; i++ { + details := make(map[string]string) + for j := 0; j < 10; j++ { + details[fmt.Sprintf("detail_%d", j)] = strings.Repeat("This is a detailed event description with comprehensive information about the system event and its impact. ", 2) + } + + history[i] = HistoryEntry{ + EventID: fmt.Sprintf("event_%d", i), + Timestamp: "2024-01-15T10:30:00Z", + EventType: "system_update", + UserID: fmt.Sprintf("admin_%d", i%5), + Details: details, + Severity: "medium", + } + } + + // Create metadata + metadata := make(map[string]interface{}) + for i := 0; i < 30; i++ { + metadata[fmt.Sprintf("meta_key_%d", i)] = strings.Repeat("This is comprehensive metadata information with detailed descriptions and specifications. ", 5) + } + + return LargePayload{ + ID: "large_payload_001", + Name: "Comprehensive Product Catalog", + Description: largeDescription, + Metadata: metadata, + Items: items, + Config: Config{ + Version: "2.1.0", + Environment: "production", + Settings: map[string]string{ + "cache_enabled": "true", + "compression_level": "high", + "timeout": "30s", + "max_connections": "1000", + "retry_attempts": "3", + }, + Features: []string{"advanced_search", "real_time_updates", "analytics", "reporting", "integration"}, + Limits: Limits{ + MaxItems: 10000, + MaxRequests: 1000, + MaxFileSize: 100, + MaxUsers: 5000, + TimeoutSecs: 30, + }, + }, + History: history, + Tags: []string{"catalog", "products", "inventory", "analytics", "reporting", "integration", "api", "dashboard"}, + Stats: Statistics{ + TotalItems: 10000, + TotalUsers: 5000, + AverageRating: 4.2, + TotalRevenue: 1250000.50, + ActiveOrders: 250, + CompletionRate: 98.5, + }, + } +} + +// GetPayloadSizeInfo returns information about the payload size before and after compression +func GetPayloadSizeInfo(payload LargePayload, converter encoded.DataConverter) (int, int, float64, error) { + // Serialize to JSON to get original size + jsonData, err := json.Marshal(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to marshal payload: %v", err) + } + originalSize := len(jsonData) + + // Compress using our converter + compressedData, err := converter.ToData(payload) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to compress payload: %v", err) + } + compressedSize := len(compressedData) + + // Calculate compression ratio + compressionRatio := float64(compressedSize) / float64(originalSize) + compressionPercentage := (1.0 - compressionRatio) * 100 + + return originalSize, compressedSize, compressionPercentage, nil +} diff --git a/cmd/samples/recipes/dataconverter/main.go b/cmd/samples/recipes/dataconverter/main.go index 3870db94..014a5d49 100644 --- a/cmd/samples/recipes/dataconverter/main.go +++ b/cmd/samples/recipes/dataconverter/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "time" "github.com/pborman/uuid" @@ -22,25 +23,43 @@ func startWorkers(h *common.SampleHelper) { FeatureFlags: client.FeatureFlags{ WorkflowExecutionAlreadyCompletedErrorEnabled: true, }, - DataConverter: NewJSONDataConverter(), + DataConverter: NewCompressedJSONDataConverter(), } h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) } func startWorkflow(h *common.SampleHelper) { - input := MyPayload{Msg: "hello", Count: 1} + // Create a large payload to demonstrate compression benefits + largeInput := CreateLargePayload() + + // Show compression statistics before starting workflow + converter := NewCompressedJSONDataConverter() + originalSize, compressedSize, compressionPercentage, err := GetPayloadSizeInfo(largeInput, converter) + if err != nil { + fmt.Printf("Error calculating compression stats: %v\n", err) + } else { + fmt.Printf("=== Compression Statistics ===\n") + fmt.Printf("Original JSON size: %d bytes (%.2f KB)\n", originalSize, float64(originalSize)/1024.0) + fmt.Printf("Compressed size: %d bytes (%.2f KB)\n", compressedSize, float64(compressedSize)/1024.0) + fmt.Printf("Compression ratio: %.2f%% reduction\n", compressionPercentage) + fmt.Printf("Space saved: %d bytes (%.2f KB)\n", originalSize-compressedSize, float64(originalSize-compressedSize)/1024.0) + fmt.Printf("=============================\n\n") + } + workflowOptions := client.StartWorkflowOptions{ ID: "dataconverter_" + uuid.New(), TaskList: ApplicationName, ExecutionStartToCloseTimeout: time.Minute, DecisionTaskStartToCloseTimeout: time.Minute, } - h.StartWorkflow(workflowOptions, DataConverterWorkflowName, input) + h.StartWorkflow(workflowOptions, LargeDataConverterWorkflowName, largeInput) } func registerWorkflowAndActivity(h *common.SampleHelper) { h.RegisterWorkflowWithAlias(dataConverterWorkflow, DataConverterWorkflowName) h.RegisterActivity(dataConverterActivity) + h.RegisterWorkflowWithAlias(largeDataConverterWorkflow, LargeDataConverterWorkflowName) + h.RegisterActivity(largeDataConverterActivity) } func main() { @@ -49,7 +68,7 @@ func main() { flag.Parse() var h common.SampleHelper - h.DataConverter = NewJSONDataConverter() + h.DataConverter = NewCompressedJSONDataConverter() h.SetupServiceConfig() switch mode { diff --git a/cmd/samples/recipes/dataconverter/workflow.go b/cmd/samples/recipes/dataconverter/workflow.go index 2ea49300..4c91f218 100644 --- a/cmd/samples/recipes/dataconverter/workflow.go +++ b/cmd/samples/recipes/dataconverter/workflow.go @@ -32,10 +32,51 @@ func dataConverterWorkflow(ctx workflow.Context, input MyPayload) (MyPayload, er logger.Error("Activity failed", zap.Error(err)) return MyPayload{}, err } + logger.Info("Workflow completed", zap.Any("result", result)) + logger.Info("Note: All data was automatically compressed/decompressed using gzip compression") + return result, nil +} + +// LargeDataConverterWorkflowName is the workflow name for large payload processing +const LargeDataConverterWorkflowName = "largeDataConverterWorkflow" + +// largeDataConverterWorkflow demonstrates processing large payloads with compression +func largeDataConverterWorkflow(ctx workflow.Context, input LargePayload) (LargePayload, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Large payload workflow started", zap.String("payload_id", input.ID)) + logger.Info("Processing large payload with compression", zap.Int("items_count", len(input.Items))) + + activityOptions := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, activityOptions) + + var result LargePayload + err := workflow.ExecuteActivity(ctx, largeDataConverterActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Large payload activity failed", zap.Error(err)) + return LargePayload{}, err + } + + logger.Info("Large payload workflow completed", zap.String("result_id", result.ID)) + logger.Info("Note: All large payload data was automatically compressed/decompressed using gzip compression") return result, nil } +func largeDataConverterActivity(ctx context.Context, input LargePayload) (LargePayload, error) { + logger := activity.GetLogger(ctx) + logger.Info("Large payload activity received input", zap.String("payload_id", input.ID), zap.Int("items_count", len(input.Items))) + + // Process the large payload (in a real scenario, this might involve data transformation, validation, etc.) + input.Name = input.Name + " (Processed)" + input.Stats.TotalItems = len(input.Items) + + logger.Info("Large payload activity completed", zap.String("result_id", input.ID)) + return input, nil +} + func dataConverterActivity(ctx context.Context, input MyPayload) (MyPayload, error) { logger := activity.GetLogger(ctx) logger.Info("Activity received input", zap.Any("input", input)) diff --git a/cmd/samples/recipes/dataconverter/workflow_test.go b/cmd/samples/recipes/dataconverter/workflow_test.go index c0a3f76d..c804e3b7 100644 --- a/cmd/samples/recipes/dataconverter/workflow_test.go +++ b/cmd/samples/recipes/dataconverter/workflow_test.go @@ -16,7 +16,7 @@ func Test_DataConverterWorkflow(t *testing.T) { env.RegisterWorkflow(dataConverterWorkflow) env.RegisterActivity(dataConverterActivity) - dataConverter := NewJSONDataConverter() + dataConverter := NewCompressedJSONDataConverter() workerOptions := worker.Options{ DataConverter: dataConverter, } @@ -36,3 +36,30 @@ func Test_DataConverterWorkflow(t *testing.T) { require.Equal(t, "test processed", activityResult.Msg) require.Equal(t, 43, activityResult.Count) } + +func Test_LargeDataConverterWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterWorkflow(largeDataConverterWorkflow) + env.RegisterActivity(largeDataConverterActivity) + + dataConverter := NewCompressedJSONDataConverter() + workerOptions := worker.Options{ + DataConverter: dataConverter, + } + env.SetWorkerOptions(workerOptions) + + input := CreateLargePayload() + + var activityResult LargePayload + env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) { + result.Get(&activityResult) + }) + + env.ExecuteWorkflow(largeDataConverterWorkflow, input) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.Equal(t, "Comprehensive Product Catalog (Processed)", activityResult.Name) + require.Equal(t, 100, activityResult.Stats.TotalItems) +} From 8a548c934bc711c3da5685736fe803a4fbc86fa8 Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Wed, 30 Jul 2025 09:00:41 -0700 Subject: [PATCH 4/7] Refactoring to remove GetCompressionRatio func from dataconverter --- cmd/samples/recipes/dataconverter/dataconverter.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/cmd/samples/recipes/dataconverter/dataconverter.go b/cmd/samples/recipes/dataconverter/dataconverter.go index 5b186f3c..326eef34 100644 --- a/cmd/samples/recipes/dataconverter/dataconverter.go +++ b/cmd/samples/recipes/dataconverter/dataconverter.go @@ -69,15 +69,3 @@ func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interf } return nil } - -// GetCompressionRatio returns the compression ratio for demonstration purposes -func (dc *compressedJSONDataConverter) GetCompressionRatio(originalData []byte) (float64, error) { - // Simulate the compression process to calculate ratio - compressedData, err := dc.ToData(string(originalData)) - if err != nil { - return 0, err - } - - ratio := float64(len(compressedData)) / float64(len(originalData)) - return ratio, nil -} From d75680447434005a4e65269c1cddf8be3062e972 Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Wed, 30 Jul 2025 13:04:18 -0700 Subject: [PATCH 5/7] refactoring to remove unused code --- cmd/samples/recipes/dataconverter/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/samples/recipes/dataconverter/main.go b/cmd/samples/recipes/dataconverter/main.go index 014a5d49..fa2af203 100644 --- a/cmd/samples/recipes/dataconverter/main.go +++ b/cmd/samples/recipes/dataconverter/main.go @@ -56,8 +56,6 @@ func startWorkflow(h *common.SampleHelper) { } func registerWorkflowAndActivity(h *common.SampleHelper) { - h.RegisterWorkflowWithAlias(dataConverterWorkflow, DataConverterWorkflowName) - h.RegisterActivity(dataConverterActivity) h.RegisterWorkflowWithAlias(largeDataConverterWorkflow, LargeDataConverterWorkflowName) h.RegisterActivity(largeDataConverterActivity) } From 0bc92aa6c434b1f8a69dcd60b791cdd93f4734e9 Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Wed, 30 Jul 2025 13:07:41 -0700 Subject: [PATCH 6/7] refactoring to remove unused code --- .../recipes/dataconverter/workflow_test.go | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/cmd/samples/recipes/dataconverter/workflow_test.go b/cmd/samples/recipes/dataconverter/workflow_test.go index c804e3b7..4313cff5 100644 --- a/cmd/samples/recipes/dataconverter/workflow_test.go +++ b/cmd/samples/recipes/dataconverter/workflow_test.go @@ -10,33 +10,6 @@ import ( "go.uber.org/cadence/worker" ) -func Test_DataConverterWorkflow(t *testing.T) { - testSuite := &testsuite.WorkflowTestSuite{} - env := testSuite.NewTestWorkflowEnvironment() - env.RegisterWorkflow(dataConverterWorkflow) - env.RegisterActivity(dataConverterActivity) - - dataConverter := NewCompressedJSONDataConverter() - workerOptions := worker.Options{ - DataConverter: dataConverter, - } - env.SetWorkerOptions(workerOptions) - - input := MyPayload{Msg: "test", Count: 42} - - var activityResult MyPayload - env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) { - result.Get(&activityResult) - }) - - env.ExecuteWorkflow(dataConverterWorkflow, input) - - require.True(t, env.IsWorkflowCompleted()) - require.NoError(t, env.GetWorkflowError()) - require.Equal(t, "test processed", activityResult.Msg) - require.Equal(t, 43, activityResult.Count) -} - func Test_LargeDataConverterWorkflow(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() From 4ec66e0efcd62ad70f3bc263270a09feaa6f3527 Mon Sep 17 00:00:00 2001 From: "vpatil16@ext.uber.com" Date: Wed, 30 Jul 2025 14:34:56 -0700 Subject: [PATCH 7/7] refactoring to remove unused code --- cmd/samples/recipes/dataconverter/workflow.go | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/cmd/samples/recipes/dataconverter/workflow.go b/cmd/samples/recipes/dataconverter/workflow.go index 4c91f218..2edc1a51 100644 --- a/cmd/samples/recipes/dataconverter/workflow.go +++ b/cmd/samples/recipes/dataconverter/workflow.go @@ -14,30 +14,6 @@ type MyPayload struct { Count int } -const DataConverterWorkflowName = "dataConverterWorkflow" - -func dataConverterWorkflow(ctx workflow.Context, input MyPayload) (MyPayload, error) { - logger := workflow.GetLogger(ctx) - logger.Info("Workflow started", zap.Any("input", input)) - - activityOptions := workflow.ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, - } - ctx = workflow.WithActivityOptions(ctx, activityOptions) - - var result MyPayload - err := workflow.ExecuteActivity(ctx, dataConverterActivity, input).Get(ctx, &result) - if err != nil { - logger.Error("Activity failed", zap.Error(err)) - return MyPayload{}, err - } - - logger.Info("Workflow completed", zap.Any("result", result)) - logger.Info("Note: All data was automatically compressed/decompressed using gzip compression") - return result, nil -} - // LargeDataConverterWorkflowName is the workflow name for large payload processing const LargeDataConverterWorkflowName = "largeDataConverterWorkflow" @@ -76,12 +52,3 @@ func largeDataConverterActivity(ctx context.Context, input LargePayload) (LargeP logger.Info("Large payload activity completed", zap.String("result_id", input.ID)) return input, nil } - -func dataConverterActivity(ctx context.Context, input MyPayload) (MyPayload, error) { - logger := activity.GetLogger(ctx) - logger.Info("Activity received input", zap.Any("input", input)) - input.Msg = input.Msg + " processed" - input.Count++ - logger.Info("Activity returning", zap.Any("output", input)) - return input, nil -}