Skip to content

Commit 7015866

Browse files
committed
data converters with compression
1 parent 5c9b374 commit 7015866

File tree

6 files changed

+410
-32
lines changed

6 files changed

+410
-32
lines changed
Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
# Data Converter Sample
22

3-
This sample workflow demonstrates how to use custom data converters in Cadence workflows. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters.
3+
This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth.
44

55
## Sample Description
66

7-
The sample implements a custom JSON data converter that:
7+
The sample implements a custom compressed JSON data converter that:
88
- Serializes workflow inputs and activity parameters to JSON format
9-
- Deserializes workflow outputs and activity results from JSON format
10-
- Provides better control over data serialization compared to the default data converter
11-
- Can be extended to support custom serialization formats (e.g., Protocol Buffers, MessagePack)
9+
- Compresses the JSON data using gzip compression to reduce size
10+
- Decompresses and deserializes workflow outputs and activity results from JSON format
11+
- Provides significant storage and bandwidth savings for large payloads
12+
- Demonstrates advanced data converter patterns for production use cases
13+
- Shows real-time compression statistics and size comparisons
1214

13-
The workflow takes a `MyPayload` struct as input, processes it through an activity, and returns the modified payload.
15+
The sample includes two workflows:
16+
1. **Simple Workflow**: Processes a basic `MyPayload` struct
17+
2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits
18+
19+
All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime.
1420

1521
## Key Components
1622

17-
- **Custom Data Converter**: `jsonDataConverter` implements the `encoded.DataConverter` interface
18-
- **Workflow**: `dataConverterWorkflow` demonstrates using custom data types with the converter
19-
- **Activity**: `dataConverterActivity` processes the input and returns modified data
20-
- **Test**: Includes unit tests to verify the data converter functionality
23+
- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression
24+
- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression
25+
- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression
26+
- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types
27+
- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration
28+
- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics
29+
- **Tests**: Includes unit tests for both simple and large payload workflows
30+
- **Compression**: Automatic gzip compression/decompression for all workflow data
2131

2232
## Steps to Run Sample
2333

@@ -33,18 +43,32 @@ The workflow takes a `MyPayload` struct as input, processes it through an activi
3343
./bin/dataconverter -m trigger
3444
```
3545

36-
You should see logs showing the workflow input being processed through the activity and the final result being returned.
46+
You should see:
47+
- Compression statistics showing original vs compressed data sizes
48+
- Workflow logs showing the processing of large payloads
49+
- Activity execution logs with payload information
50+
- Final workflow completion with compression benefits noted
3751

3852
## Customization
3953

40-
To use a different serialization format, you can implement your own data converter by:
41-
1. Creating a struct that implements the `encoded.DataConverter` interface
42-
2. Implementing the `ToData` method for serialization
43-
3. Implementing the `FromData` method for deserialization
44-
4. Registering the converter in the worker options
54+
To implement your own data converter with compression or other features:
55+
1. Create a struct that implements the `encoded.DataConverter` interface
56+
2. Implement the `ToData` method for serialization and compression
57+
3. Implement the `FromData` method for decompression and deserialization
58+
4. Register the converter in the worker options
4559

4660
This pattern is useful when you need to:
61+
- Reduce storage costs and bandwidth usage with compression
4762
- Use specific serialization formats for performance or compatibility
4863
- Add encryption/decryption to workflow data
49-
- Implement custom compression for large payloads
50-
- Support legacy data formats
64+
- Implement custom compression algorithms (LZ4, Snappy, etc.)
65+
- Support legacy data formats
66+
- Add data validation or transformation during serialization
67+
68+
## Performance Benefits
69+
70+
The compressed data converter provides:
71+
- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads
72+
- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission
73+
- **Cost Optimization**: Reduced storage costs in Cadence history
74+
- **Scalability**: Better performance with large payloads

cmd/samples/recipes/dataconverter/dataconverter.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,65 @@ package main
22

33
import (
44
"bytes"
5+
"compress/gzip"
56
"encoding/json"
67
"fmt"
8+
"io"
79
"reflect"
810

911
"go.uber.org/cadence/encoded"
1012
)
1113

12-
type jsonDataConverter struct{}
14+
type compressedJSONDataConverter struct{}
1315

14-
func NewJSONDataConverter() encoded.DataConverter {
15-
return &jsonDataConverter{}
16+
func NewCompressedJSONDataConverter() encoded.DataConverter {
17+
return &compressedJSONDataConverter{}
1618
}
1719

18-
func (dc *jsonDataConverter) ToData(value ...interface{}) ([]byte, error) {
19-
var buf bytes.Buffer
20-
enc := json.NewEncoder(&buf)
20+
func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) {
21+
// First, serialize to JSON
22+
var jsonBuf bytes.Buffer
23+
enc := json.NewEncoder(&jsonBuf)
2124
for i, obj := range value {
2225
err := enc.Encode(obj)
2326
if err != nil {
2427
return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
2528
}
2629
}
27-
return buf.Bytes(), nil
30+
31+
// Then compress the JSON data
32+
var compressedBuf bytes.Buffer
33+
gzipWriter := gzip.NewWriter(&compressedBuf)
34+
35+
_, err := gzipWriter.Write(jsonBuf.Bytes())
36+
if err != nil {
37+
return nil, fmt.Errorf("unable to compress data: %v", err)
38+
}
39+
40+
err = gzipWriter.Close()
41+
if err != nil {
42+
return nil, fmt.Errorf("unable to close gzip writer: %v", err)
43+
}
44+
45+
return compressedBuf.Bytes(), nil
2846
}
2947

30-
func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
31-
dec := json.NewDecoder(bytes.NewBuffer(input))
48+
func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
49+
// First, decompress the data
50+
gzipReader, err := gzip.NewReader(bytes.NewBuffer(input))
51+
if err != nil {
52+
return fmt.Errorf("unable to create gzip reader: %v", err)
53+
}
54+
defer gzipReader.Close()
55+
56+
// Read the decompressed JSON data
57+
decompressedData, err := io.ReadAll(gzipReader)
58+
if err != nil {
59+
return fmt.Errorf("unable to decompress data: %v", err)
60+
}
61+
62+
// Then deserialize from JSON
63+
dec := json.NewDecoder(bytes.NewBuffer(decompressedData))
3264
for i, obj := range valuePtr {
3365
err := dec.Decode(obj)
3466
if err != nil {
@@ -37,3 +69,15 @@ func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) err
3769
}
3870
return nil
3971
}
72+
73+
// GetCompressionRatio returns the compression ratio for demonstration purposes
74+
func (dc *compressedJSONDataConverter) GetCompressionRatio(originalData []byte) (float64, error) {
75+
// Simulate the compression process to calculate ratio
76+
compressedData, err := dc.ToData(string(originalData))
77+
if err != nil {
78+
return 0, err
79+
}
80+
81+
ratio := float64(len(compressedData)) / float64(len(originalData))
82+
return ratio, nil
83+
}

0 commit comments

Comments
 (0)