Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ PROGS = helloworld \
pageflow \
signalcounter \
sideeffect \
dataconverter \

TEST_ARG ?= -race -v -timeout 5m
BUILD := ./build
Expand Down Expand Up @@ -68,6 +69,7 @@ TEST_DIRS=./cmd/samples/cron \
./cmd/samples/recipes/searchattributes \
./cmd/samples/recipes/sideeffect \
./cmd/samples/recipes/signalcounter \
./cmd/samples/recipes/dataconverter \
./cmd/samples/recovery \
./cmd/samples/pso \

Expand Down Expand Up @@ -176,6 +178,9 @@ sideeffect:
versioning:
go build -o bin/versioning cmd/samples/recipes/versioning/*.go

dataconverter:
go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go

bins: helloworld \
versioning \
delaystart \
Expand Down Expand Up @@ -207,6 +212,7 @@ bins: helloworld \
pageflow \
signalcounter \
sideeffect \
dataconverter \

test: bins
@rm -f test
Expand Down
50 changes: 50 additions & 0 deletions cmd/samples/recipes/dataconverter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Data Converter Sample

This sample workflow demonstrates how to use custom data converters in Cadence workflows. The data converter is responsible for serializing and deserializing workflow inputs, outputs, and activity parameters.

## Sample Description

The sample implements a custom JSON data converter that:
- Serializes workflow inputs and activity parameters to JSON format
- Deserializes workflow outputs and activity results from JSON format
- Provides better control over data serialization compared to the default data converter
- Can be extended to support custom serialization formats (e.g., Protocol Buffers, MessagePack)

The workflow takes a `MyPayload` struct as input, processes it through an activity, and returns the modified payload.

## Key Components

- **Custom Data Converter**: `jsonDataConverter` implements the `encoded.DataConverter` interface
- **Workflow**: `dataConverterWorkflow` demonstrates using custom data types with the converter
- **Activity**: `dataConverterActivity` processes the input and returns modified data
- **Test**: Includes unit tests to verify the data converter functionality

## Steps to Run Sample

1. You need a cadence service running. See details in cmd/samples/README.md

2. Run the following command to start the worker:
```
./bin/dataconverter -m worker
```

3. Run the following command to execute the workflow:
```
./bin/dataconverter -m trigger
```

You should see logs showing the workflow input being processed through the activity and the final result being returned.

## Customization

To use a different serialization format, you can implement your own data converter by:
1. Creating a struct that implements the `encoded.DataConverter` interface
2. Implementing the `ToData` method for serialization
3. Implementing the `FromData` method for deserialization
4. Registering the converter in the worker options

This pattern is useful when you need to:
- Use specific serialization formats for performance or compatibility
- Add encryption/decryption to workflow data
- Implement custom compression for large payloads
- Support legacy data formats
39 changes: 39 additions & 0 deletions cmd/samples/recipes/dataconverter/dataconverter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"reflect"

"go.uber.org/cadence/encoded"
)

type jsonDataConverter struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default data converter is also json so to make this sample a little bit more interesting we can apply compression

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!


func NewJSONDataConverter() encoded.DataConverter {
return &jsonDataConverter{}
}

func (dc *jsonDataConverter) ToData(value ...interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for i, obj := range value {
err := enc.Encode(obj)
if err != nil {
return nil, fmt.Errorf("unable to encode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}
return buf.Bytes(), nil
}

func (dc *jsonDataConverter) FromData(input []byte, valuePtr ...interface{}) error {
dec := json.NewDecoder(bytes.NewBuffer(input))
for i, obj := range valuePtr {
err := dec.Decode(obj)
if err != nil {
return fmt.Errorf("unable to decode argument: %d, %v, with error: %v", i, reflect.TypeOf(obj), err)
}
}
return nil
}
63 changes: 63 additions & 0 deletions cmd/samples/recipes/dataconverter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"flag"
"time"

"github.com/pborman/uuid"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"

"github.com/uber-common/cadence-samples/cmd/samples/common"
)

const (
ApplicationName = "dataConverterTaskList"
)

func startWorkers(h *common.SampleHelper) {
workerOptions := worker.Options{
MetricsScope: h.WorkerMetricScope,
Logger: h.Logger,
FeatureFlags: client.FeatureFlags{
WorkflowExecutionAlreadyCompletedErrorEnabled: true,
},
DataConverter: NewJSONDataConverter(),
}
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
}

func startWorkflow(h *common.SampleHelper) {
input := MyPayload{Msg: "hello", Count: 1}
workflowOptions := client.StartWorkflowOptions{
ID: "dataconverter_" + uuid.New(),
TaskList: ApplicationName,
ExecutionStartToCloseTimeout: time.Minute,
DecisionTaskStartToCloseTimeout: time.Minute,
}
h.StartWorkflow(workflowOptions, DataConverterWorkflowName, input)
}

func registerWorkflowAndActivity(h *common.SampleHelper) {
h.RegisterWorkflowWithAlias(dataConverterWorkflow, DataConverterWorkflowName)
h.RegisterActivity(dataConverterActivity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are unused. Let's keep the sample lean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed thanks!

}

func main() {
var mode string
flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.")
flag.Parse()

var h common.SampleHelper
h.DataConverter = NewJSONDataConverter()
h.SetupServiceConfig()

switch mode {
case "worker":
registerWorkflowAndActivity(&h)
startWorkers(&h)
select {}
case "trigger":
startWorkflow(&h)
}
}
46 changes: 46 additions & 0 deletions cmd/samples/recipes/dataconverter/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"time"

"go.uber.org/cadence/activity"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)

type MyPayload struct {
Msg string
Count int
}

const DataConverterWorkflowName = "dataConverterWorkflow"

func dataConverterWorkflow(ctx workflow.Context, input MyPayload) (MyPayload, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Workflow started", zap.Any("input", input))

activityOptions := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)

var result MyPayload
err := workflow.ExecuteActivity(ctx, dataConverterActivity, input).Get(ctx, &result)
if err != nil {
logger.Error("Activity failed", zap.Error(err))
return MyPayload{}, err
}
logger.Info("Workflow completed", zap.Any("result", result))
return result, nil
}

func dataConverterActivity(ctx context.Context, input MyPayload) (MyPayload, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity received input", zap.Any("input", input))
input.Msg = input.Msg + " processed"
input.Count++
logger.Info("Activity returning", zap.Any("output", input))
return input, nil
}
38 changes: 38 additions & 0 deletions cmd/samples/recipes/dataconverter/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/testsuite"
"go.uber.org/cadence/worker"
)

func Test_DataConverterWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(dataConverterWorkflow)
env.RegisterActivity(dataConverterActivity)

dataConverter := NewJSONDataConverter()
workerOptions := worker.Options{
DataConverter: dataConverter,
}
env.SetWorkerOptions(workerOptions)

input := MyPayload{Msg: "test", Count: 42}

var activityResult MyPayload
env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result encoded.Value, err error) {
result.Get(&activityResult)
})

env.ExecuteWorkflow(dataConverterWorkflow, input)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
require.Equal(t, "test processed", activityResult.Msg)
require.Equal(t, 43, activityResult.Count)
}
Loading