Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions data-converter-large-payloads/README.md
Copy link
Member

@cretz cretz Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are going to do implicit large payload offloading, we recommend using a codec instead of a converter, as https://github.com/DataDog/temporal-large-payload-codec has.

However, with that third party example, we are not necessarily looking for a blessed example of this at this time for two reasons. First, an ideal sample since Temporal is distributed would not write to disk. Second, this isn't often a preferred pattern as arbitrarily offloading large data to/from external stores can hide from the workflow authors that they should consider being more explicit about offloading large data instead of constantly doing this on replay and such. Finally, we are actively working on improving this situation soon doing basically this exact thing but providing more explicit external payload storage interfaces and warnings.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
This sample shows an implementation of a payload converter that automatically detects workflow and activity payloads larger than a certain threshold and writes them to a file if that's the case.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run data-converter-large-payloads/worker/main.go
```
3) Run the following command to start the example
```
go run data-converter-large-payloads/starter/main.go
```

For the inputs that exceed the Threshold, you should see them in the Temporal UI looking something like this:

```
{
"metadata": {
"encoding": "ZXhhbXBsZS9sb2NhbF9maWxl"
},
"data": "MDdkYjVhZTUtMGIyMC00OTRjLTk2NzgtNWExOGQwY2RiYmQ4"
}
```

Contents are base64 encoded.

The files specified by "data" (the decoded value) should be created locally.
136 changes: 136 additions & 0 deletions data-converter-large-payloads/payloadconverter/data_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package dataconverter

import (
"encoding/json"
"fmt"
"os"

"github.com/google/uuid"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
)

const (
MB = 1_000_000
MetadataEncodingLocalFile = "example/local_file"
)

// LargeSizePayloadConverter is a payload converter that detects if incoming
// payloads are larger than a configurable threshold and if so, stores them to
// a file instead of passing them directly to the Temporal server. The threshold
// should be configured lower than the limit imposed by Temporal, which is 2MB.
//
// Note that other storage systems such as Postgres or S3 are a much better
// choice than local files in scenarios other than development and testing.
type LargeSizePayloadConverter struct {
// used for generating names of files
idGenerator idGenerator
// used to detect which payloads must be stored in files
threshold int
}

// idGenerator is used to generate the names of the files created for storing
// the large payloads.
type idGenerator interface {
NewString() string
}

// LargeSizePayloadConverterOption specifies configuration options for the
// payload converter.
type LargeSizePayloadConverterOption func(*LargeSizePayloadConverter)

// WithThreshold sets the threshold from which payloads start to get written to
// files. Payloads are stored as JSONs in files and the raw []byte JSON payload
// length is compared to the threshold.
func WithThreshold(sizeBytes int) LargeSizePayloadConverterOption {
return func(lspc *LargeSizePayloadConverter) {
lspc.threshold = sizeBytes
}
}

// NewLargeSizePayloadConverter creates new instance of
// LargeSizePayloadConverter.
//
// Do not use this payload converter on its own! Because the converter only
// applies to payloads larger than the threshold, there must be a fallback
// converter for payloads whose size is lower than the threshold. Always use
// this inside a composite payload converter.
func NewLargeSizePayloadConverter(opts ...LargeSizePayloadConverterOption) *LargeSizePayloadConverter {
c := LargeSizePayloadConverter{
idGenerator: uuidv4IDGenerator{},
threshold: 1 * MB,
}

for _, opt := range opts {
opt(&c)
}

return &c
}

// ToPayload implements converter.PayloadConverter.
func (c *LargeSizePayloadConverter) ToPayload(value any) (*commonpb.Payload, error) {

result, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to marshal json: %w", err)
}

// will fallback on other registered converters
if len(result) < c.threshold {
return nil, nil
}

converter.NewJSONPayloadConverter()

filename := c.idGenerator.NewString()
err = os.WriteFile(filename, result, 0644)
if err != nil {
return nil, fmt.Errorf("failed to write payload to file: %w", err)
}

return &commonpb.Payload{
Metadata: map[string][]byte{
converter.MetadataEncoding: []byte(MetadataEncodingLocalFile),
},
Data: []byte(filename),
}, nil
}

// FromPayload implements converter.PayloadConverter.
func (c *LargeSizePayloadConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error {

// This only gets called for payloads where the encoding is
// "example/local_file", in which case the file name is stored in the data.
filename := string(payload.Data)
data, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("failed to read payload from file: %w", err)
}

err = json.Unmarshal(data, valuePtr)
if err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}

return nil
}

// ToString implements converter.PayloadConverter.
func (c *LargeSizePayloadConverter) ToString(payload *commonpb.Payload) string {
// same as converter.JSONPayloadConverter
return string(payload.GetData())
}

// Encoding implements converter.PayloadConverter.
func (c *LargeSizePayloadConverter) Encoding() string {
return MetadataEncodingLocalFile
}

var _ converter.PayloadConverter = &LargeSizePayloadConverter{}

type uuidv4IDGenerator struct{}

func (u uuidv4IDGenerator) NewString() string {
return uuid.NewString()
}
Loading