Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ PROGS = helloworld \
signalcounter \
sideeffect \
sleep \
dataconverter \

TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
Expand Down Expand Up @@ -70,6 +71,7 @@ TEST_DIRS=./cmd/samples/cron \
./cmd/samples/recipes/sideeffect \
./cmd/samples/recipes/signalcounter \
./cmd/samples/recipes/sleep \
./cmd/samples/recipes/dataconverter \
./cmd/samples/recovery \
./cmd/samples/pso \

Expand Down Expand Up @@ -181,6 +183,9 @@ sideeffect:
versioning:
go build -o bin/versioning cmd/samples/recipes/versioning/*.go

dataconverter:
go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go

bins: helloworld \
versioning \
delaystart \
Expand Down Expand Up @@ -213,6 +218,7 @@ bins: helloworld \
signalcounter \
sideeffect \
sleep \
dataconverter \

test: bins
@rm -f test
Expand Down
74 changes: 74 additions & 0 deletions cmd/samples/recipes/dataconverter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Data Converter Sample

This sample workflow demonstrates how to use custom data converters in Cadence workflows with compression capabilities. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters, with the added benefit of data compression to save storage space and bandwidth.

## Sample Description

The sample implements a custom compressed JSON data converter that:
- Serializes workflow inputs and activity parameters to JSON format
- Compresses the JSON data using gzip compression to reduce size
- Decompresses and deserializes workflow outputs and activity results from JSON format
- Provides significant storage and bandwidth savings for large payloads
- Demonstrates advanced data converter patterns for production use cases
- Shows real-time compression statistics and size comparisons

The sample includes two workflows:
1. **Simple Workflow**: Processes a basic `MyPayload` struct
2. **Large Payload Workflow**: Processes a complex `LargePayload` with nested objects, arrays, and extensive data to demonstrate compression benefits

All data is automatically compressed during serialization and decompressed during deserialization, with compression statistics displayed at runtime.

## Key Components

- **Custom Data Converter**: `compressedJSONDataConverter` implements the `encoded.DataConverter` interface with gzip compression
- **Simple Workflow**: `dataConverterWorkflow` demonstrates basic payload processing with compression
- **Large Payload Workflow**: `largeDataConverterWorkflow` demonstrates processing complex data structures with compression
- **Activities**: `dataConverterActivity` and `largeDataConverterActivity` process different payload types
- **Large Payload Generator**: `CreateLargePayload()` creates realistic complex data for compression demonstration
- **Compression Statistics**: `GetPayloadSizeInfo()` shows before/after compression metrics
- **Tests**: Includes unit tests for both simple and large payload workflows
- **Compression**: Automatic gzip compression/decompression for all workflow data

## Steps to Run Sample

1. You need a cadence service running. See details in cmd/samples/README.md

2. Run the following command to start the worker:
```
./bin/dataconverter -m worker
```

3. Run the following command to execute the workflow:
```
./bin/dataconverter -m trigger
```

You should see:
- Compression statistics showing original vs compressed data sizes
- Workflow logs showing the processing of large payloads
- Activity execution logs with payload information
- Final workflow completion with compression benefits noted

## Customization

To implement your own data converter with compression or other features:
1. Create a struct that implements the `encoded.DataConverter` interface
2. Implement the `ToData` method for serialization and compression
3. Implement the `FromData` method for decompression and deserialization
4. Register the converter in the worker options

This pattern is useful when you need to:
- Reduce storage costs and bandwidth usage with compression
- Use specific serialization formats for performance or compatibility
- Add encryption/decryption to workflow data
- Implement custom compression algorithms (LZ4, Snappy, etc.)
- Support legacy data formats
- Add data validation or transformation during serialization

## Performance Benefits

The compressed data converter provides:
- **Storage Savings**: Typically 60-80% reduction in data size for JSON payloads
- **Bandwidth Reduction**: Lower network transfer costs and faster data transmission
- **Cost Optimization**: Reduced storage costs in Cadence history
- **Scalability**: Better performance with large payloads
71 changes: 71 additions & 0 deletions cmd/samples/recipes/dataconverter/dataconverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"reflect"

"go.uber.org/cadence/encoded"
)

type compressedJSONDataConverter struct{}

func NewCompressedJSONDataConverter() encoded.DataConverter {
return &compressedJSONDataConverter{}
}

func (dc *compressedJSONDataConverter) ToData(value ...interface{}) ([]byte, error) {
// First, serialize to JSON
var jsonBuf bytes.Buffer
enc := json.NewEncoder(&jsonBuf)
for i, obj := range value {
err := enc.Encode(obj)
if err != nil {
return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}

// Then compress the JSON data
var compressedBuf bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedBuf)

_, err := gzipWriter.Write(jsonBuf.Bytes())
if err != nil {
return nil, fmt.Errorf("unable to compress data: %v", err)
}

err = gzipWriter.Close()
if err != nil {
return nil, fmt.Errorf("unable to close gzip writer: %v", err)
}

return compressedBuf.Bytes(), nil
}

func (dc *compressedJSONDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
// First, decompress the data
gzipReader, err := gzip.NewReader(bytes.NewBuffer(input))
if err != nil {
return fmt.Errorf("unable to create gzip reader: %v", err)
}
defer gzipReader.Close()

// Read the decompressed JSON data
decompressedData, err := io.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("unable to decompress data: %v", err)
}

// Then deserialize from JSON
dec := json.NewDecoder(bytes.NewBuffer(decompressedData))
for i, obj := range valuePtr {
err := dec.Decode(obj)
if err != nil {
return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}
return nil
}
Loading
Loading