Skip to content

Commit 3ad2d71

Browse files
authored
Workflow for Data Converter (#101)
* Workflow for DataConverter
1 parent b86320d commit 3ad2d71

File tree

7 files changed

+546
-0
lines changed

7 files changed

+546
-0
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ PROGS = helloworld \
3636
signalcounter \
3737
sideeffect \
3838
sleep \
39+
dataconverter \
3940

4041
TEST_ARG ?= -race -v -timeout 5m
4142
BUILD := ./build
@@ -70,6 +71,7 @@ TEST_DIRS=./cmd/samples/cron \
7071
./cmd/samples/recipes/sideeffect \
7172
./cmd/samples/recipes/signalcounter \
7273
./cmd/samples/recipes/sleep \
74+
./cmd/samples/recipes/dataconverter \
7375
./cmd/samples/recovery \
7476
./cmd/samples/pso \
7577

@@ -181,6 +183,9 @@ sideeffect:
181183
versioning:
182184
go build -o bin/versioning cmd/samples/recipes/versioning/*.go
183185

186+
dataconverter:
187+
go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go
188+
184189
bins: helloworld \
185190
versioning \
186191
delaystart \
@@ -213,6 +218,7 @@ bins: helloworld \
213218
signalcounter \
214219
sideeffect \
215220
sleep \
221+
dataconverter \
216222

217223
test: bins
218224
@rm -f test
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Data Converter Sample
2+
3+
This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth.
4+
5+
## Sample Description
6+
7+
The sample implements a custom compressed JSON data converter that:
8+
- Serializes workflow inputs and activity parameters to JSON format
9+
- Compresses the JSON data using gzip compression to reduce size
10+
- Decompresses and deserializes workflow outputs and activity results from JSON format
11+
- Provides significant storage and bandwidth savings for large payloads
12+
- Demonstrates advanced data converter patterns for production use cases
13+
- Shows real-time compression statistics and size comparisons
14+
15+
The sample includes two workflows:
16+
1. **Simple Workflow**: Processes a basic `MyPayload` struct
17+
2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits
18+
19+
All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime.
20+
21+
## Key Components
22+
23+
- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression
24+
- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression
25+
- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression
26+
- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types
27+
- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration
28+
- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics
29+
- **Tests**: Includes unit tests for both simple and large payload workflows
30+
- **Compression**: Automatic gzip compression/decompression for all workflow data
31+
32+
## Steps to Run Sample
33+
34+
1. You need a cadence service running. See details in cmd/samples/README.md
35+
36+
2. Run the following command to start the worker:
37+
```
38+
./bin/dataconverter -m worker
39+
```
40+
41+
3. Run the following command to execute the workflow:
42+
```
43+
./bin/dataconverter -m trigger
44+
```
45+
46+
You should see:
47+
- Compression statistics showing original vs compressed data sizes
48+
- Workflow logs showing the processing of large payloads
49+
- Activity execution logs with payload information
50+
- Final workflow completion with compression benefits noted
51+
52+
## Customization
53+
54+
To implement your own data converter with compression or other features:
55+
1. Create a struct that implements the `encoded.DataConverter` interface
56+
2. Implement the `ToData` method for serialization and compression
57+
3. Implement the `FromData` method for decompression and deserialization
58+
4. Register the converter in the worker options
59+
60+
This pattern is useful when you need to:
61+
- Reduce storage costs and bandwidth usage with compression
62+
- Use specific serialization formats for performance or compatibility
63+
- Add encryption/decryption to workflow data
64+
- Implement custom compression algorithms (LZ4, Snappy, etc.)
65+
- Support legacy data formats
66+
- Add data validation or transformation during serialization
67+
68+
## Performance Benefits
69+
70+
The compressed data converter provides:
71+
- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads
72+
- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission
73+
- **Cost Optimization**: Reduced storage costs in Cadence history
74+
- **Scalability**: Better performance with large payloads
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"reflect"
10+
11+
"go.uber.org/cadence/encoded"
12+
)
13+
14+
type compressedJSONDataConverter struct{}
15+
16+
func NewCompressedJSONDataConverter() encoded.DataConverter {
17+
return &compressedJSONDataConverter{}
18+
}
19+
20+
func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) {
21+
// First, serialize to JSON
22+
var jsonBuf bytes.Buffer
23+
enc := json.NewEncoder(&jsonBuf)
24+
for i, obj := range value {
25+
err := enc.Encode(obj)
26+
if err != nil {
27+
return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
28+
}
29+
}
30+
31+
// Then compress the JSON data
32+
var compressedBuf bytes.Buffer
33+
gzipWriter := gzip.NewWriter(&compressedBuf)
34+
35+
_, err := gzipWriter.Write(jsonBuf.Bytes())
36+
if err != nil {
37+
return nil, fmt.Errorf("unable to compress data: %v", err)
38+
}
39+
40+
err = gzipWriter.Close()
41+
if err != nil {
42+
return nil, fmt.Errorf("unable to close gzip writer: %v", err)
43+
}
44+
45+
return compressedBuf.Bytes(), nil
46+
}
47+
48+
func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
49+
// First, decompress the data
50+
gzipReader, err := gzip.NewReader(bytes.NewBuffer(input))
51+
if err != nil {
52+
return fmt.Errorf("unable to create gzip reader: %v", err)
53+
}
54+
defer gzipReader.Close()
55+
56+
// Read the decompressed JSON data
57+
decompressedData, err := io.ReadAll(gzipReader)
58+
if err != nil {
59+
return fmt.Errorf("unable to decompress data: %v", err)
60+
}
61+
62+
// Then deserialize from JSON
63+
dec := json.NewDecoder(bytes.NewBuffer(decompressedData))
64+
for i, obj := range valuePtr {
65+
err := dec.Decode(obj)
66+
if err != nil {
67+
return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
68+
}
69+
}
70+
return nil
71+
}

0 commit comments

Comments
 (0)