-
Notifications
You must be signed in to change notification settings - Fork 102
Workflow for Data Converter #101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+546
−0
Merged
Changes from 4 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
ca038c6
Workflow for DataConverter
vishwa-uber b2fe6c0
Workflow for DataConverter
vishwa-uber 5c9b374
Merge branch 'master' into data_converter
vishwa-uber 7015866
data converters with compression
vishwa-uber 8a548c9
Refactoring to remove GetCompressionRatio func from dataconverter
vishwa-uber d756804
refactoring to remove unused code
vishwa-uber 0bc92aa
refactoring to remove unused code
vishwa-uber 4ec66e0
refactoring to remove unused code
vishwa-uber File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # Data Converter Sample | ||
|
|
||
| This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth. | ||
|
|
||
| ## Sample Description | ||
|
|
||
| The sample implements a custom compressed JSON data converter that: | ||
| - Serializes workflow inputs and activity parameters to JSON format | ||
| - Compresses the JSON data using gzip compression to reduce size | ||
| - Decompresses and deserializes workflow outputs and activity results from JSON format | ||
| - Provides significant storage and bandwidth savings for large payloads | ||
| - Demonstrates advanced data converter patterns for production use cases | ||
| - Shows real-time compression statistics and size comparisons | ||
|
|
||
| The sample includes two workflows: | ||
| 1. **Simple Workflow**: Processes a basic `MyPayload` struct | ||
| 2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits | ||
|
|
||
| All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime. | ||
|
|
||
| ## Key Components | ||
|
|
||
| - **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression | ||
| - **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression | ||
| - **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression | ||
| - **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types | ||
| - **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration | ||
| - **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics | ||
| - **Tests**: Includes unit tests for both simple and large payload workflows | ||
| - **Compression**: Automatic gzip compression/decompression for all workflow data | ||
|
|
||
| ## Steps to Run Sample | ||
|
|
||
| 1. You need a cadence service running. See details in cmd/samples/README.md | ||
|
|
||
| 2. Run the following command to start the worker: | ||
| ``` | ||
| ./bin/dataconverter -m worker | ||
| ``` | ||
|
|
||
| 3. Run the following command to execute the workflow: | ||
| ``` | ||
| ./bin/dataconverter -m trigger | ||
| ``` | ||
|
|
||
| You should see: | ||
| - Compression statistics showing original vs compressed data sizes | ||
| - Workflow logs showing the processing of large payloads | ||
| - Activity execution logs with payload information | ||
| - Final workflow completion with compression benefits noted | ||
|
|
||
| ## Customization | ||
|
|
||
| To implement your own data converter with compression or other features: | ||
| 1. Create a struct that implements the `encoded.DataConverter` interface | ||
| 2. Implement the `ToData` method for serialization and compression | ||
| 3. Implement the `FromData` method for decompression and deserialization | ||
| 4. Register the converter in the worker options | ||
|
|
||
| This pattern is useful when you need to: | ||
| - Reduce storage costs and bandwidth usage with compression | ||
| - Use specific serialization formats for performance or compatibility | ||
| - Add encryption/decryption to workflow data | ||
| - Implement custom compression algorithms (LZ4, Snappy, etc.) | ||
| - Support legacy data formats | ||
| - Add data validation or transformation during serialization | ||
|
|
||
| ## Performance Benefits | ||
|
|
||
| The compressed data converter provides: | ||
| - **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads | ||
| - **Bandwidth Reduction**: Lower network transfer costs and faster data transmission | ||
| - **Cost Optimization**: Reduced storage costs in Cadence history | ||
| - **Scalability**: Better performance with large payloads |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "compress/gzip" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "reflect" | ||
|
|
||
| "go.uber.org/cadence/encoded" | ||
| ) | ||
|
|
||
| type compressedJSONDataConverter struct{} | ||
|
|
||
| func NewCompressedJSONDataConverter() encoded.DataConverter { | ||
| return &compressedJSONDataConverter{} | ||
| } | ||
|
|
||
| func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) { | ||
| // First, serialize to JSON | ||
| var jsonBuf bytes.Buffer | ||
| enc := json.NewEncoder(&jsonBuf) | ||
| for i, obj := range value { | ||
| err := enc.Encode(obj) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) | ||
| } | ||
| } | ||
|
|
||
| // Then compress the JSON data | ||
| var compressedBuf bytes.Buffer | ||
| gzipWriter := gzip.NewWriter(&compressedBuf) | ||
|
|
||
| _, err := gzipWriter.Write(jsonBuf.Bytes()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to compress data: %v", err) | ||
| } | ||
|
|
||
| err = gzipWriter.Close() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to close gzip writer: %v", err) | ||
| } | ||
|
|
||
| return compressedBuf.Bytes(), nil | ||
| } | ||
|
|
||
| func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error { | ||
| // First, decompress the data | ||
| gzipReader, err := gzip.NewReader(bytes.NewBuffer(input)) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to create gzip reader: %v", err) | ||
| } | ||
| defer gzipReader.Close() | ||
|
|
||
| // Read the decompressed JSON data | ||
| decompressedData, err := io.ReadAll(gzipReader) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to decompress data: %v", err) | ||
| } | ||
|
|
||
| // Then deserialize from JSON | ||
| dec := json.NewDecoder(bytes.NewBuffer(decompressedData)) | ||
| for i, obj := range valuePtr { | ||
| err := dec.Decode(obj) | ||
| if err != nil { | ||
| return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // GetCompressionRatio returns the compression ratio for demonstration purposes | ||
| func (dc *compressedJSONDataConverter) GetCompressionRatio(originalData []byte) (float64, error) { | ||
| // Simulate the compression process to calculate ratio | ||
| compressedData, err := dc.ToData(string(originalData)) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
|
|
||
| ratio := float64(len(compressedData)) / float64(len(originalData)) | ||
| return ratio, nil | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think anyone is using this
GetCompressionRatiofunc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it thanks