Skip to content

Commit 4ad1daa

Browse files
authored
Customizable serialization (#8426)
## What changed? Added `TEMPORAL_TEST_DATA_ENCODING` to change DataBlob encoding from "proto3" to "json". ## Why? Observability and debugging. The ability to see payloads decoded in debugger and OTEL traces is valuable. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s) ## Potential risks End users should never use this. The env var name therefore includes `TEST_`.
1 parent 4a2e88a commit 4ad1daa

File tree

10 files changed

+230
-44
lines changed

10 files changed

+230
-44
lines changed

common/dynamicconfig/collection_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
testGetDurationPropertyStructuredDefaults = "testGetDurationPropertyStructuredDefaults"
3939
testGetBoolPropertyFilteredByNamespaceIDKey = "testGetBoolPropertyFilteredByNamespaceIDKey"
4040
testGetBoolPropertyFilteredByTaskQueueInfoKey = "testGetBoolPropertyFilteredByTaskQueueInfoKey"
41+
testGetStringPropertyFilteredByNamespaceKey = "testGetStringPropertyFilteredByNamespaceKey"
4142
testGetStringPropertyFilteredByNamespaceIDKey = "testGetStringPropertyFilteredByNamespaceIDKey"
4243
testGetIntPropertyFilteredByDestinationKey = "testGetIntPropertyFilteredByDestinationKey"
4344
)
@@ -87,17 +88,17 @@ func (s *collectionSuite) TestGetIntPropertyFilteredByNamespace() {
8788
}
8889

8990
func (s *collectionSuite) TestGetStringPropertyFnFilteredByNamespace() {
90-
namespace := "testNamespace"
91-
value := dynamicconfig.DefaultEventEncoding.Get(s.cln)
92-
// copied default value, change this if it changes
93-
s.Equal(enumspb.ENCODING_TYPE_PROTO3.String(), value(namespace))
94-
s.client.SetValue(dynamicconfig.DefaultEventEncoding.Key().String(), "efg")
95-
s.Equal("efg", value(namespace))
91+
ns := "testNamespace"
92+
setting := dynamicconfig.NewNamespaceStringSetting(testGetStringPropertyFilteredByNamespaceKey, "abc", "")
93+
value := setting.Get(s.cln)
94+
s.Equal("abc", value(ns))
95+
s.client.SetValue(testGetStringPropertyFilteredByNamespaceKey, "efg")
96+
s.Equal("efg", value(ns))
9697
}
9798

9899
func (s *collectionSuite) TestGetStringPropertyFnFilteredByNamespaceID() {
99-
setting := dynamicconfig.NewNamespaceIDStringSetting(testGetStringPropertyFilteredByNamespaceIDKey, "abc", "")
100100
namespaceID := namespace.ID("testNamespaceID")
101+
setting := dynamicconfig.NewNamespaceIDStringSetting(testGetStringPropertyFilteredByNamespaceIDKey, "abc", "")
101102
value := setting.Get(s.cln)
102103
s.Equal("abc", value(namespaceID))
103104
s.client.SetValue(testGetStringPropertyFilteredByNamespaceIDKey, "efg")

common/dynamicconfig/constants.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"os"
66
"time"
77

8-
enumspb "go.temporal.io/api/enums/v1"
98
sdkworker "go.temporal.io/sdk/worker"
109
"go.temporal.io/server/common/debug"
1110
"go.temporal.io/server/common/primitives"
@@ -2307,11 +2306,6 @@ When the this config is zero or lower we will only update shard info at most onc
23072306
false,
23082307
`EmitShardLagLog whether emit the shard lag log`,
23092308
)
2310-
DefaultEventEncoding = NewNamespaceStringSetting(
2311-
"history.defaultEventEncoding",
2312-
enumspb.ENCODING_TYPE_PROTO3.String(),
2313-
`DefaultEventEncoding is the encoding type for history events`,
2314-
)
23152309
DefaultActivityRetryPolicy = NewNamespaceTypedSetting(
23162310
"history.defaultActivityRetryPolicy",
23172311
retrypolicy.DefaultDefaultRetrySettings,

common/persistence/serialization/codec.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,42 @@ package serialization
22

33
import (
44
"errors"
5+
"fmt"
6+
"os"
7+
"strings"
58

69
commonpb "go.temporal.io/api/common/v1"
710
enumspb "go.temporal.io/api/enums/v1"
811
"go.temporal.io/server/common/codec"
912
"google.golang.org/protobuf/proto"
1013
)
1114

15+
// SerializerDataEncodingEnvVar controls which codec is used for encoding DataBlobs.
16+
//
17+
// Currently supported values (case-insensitive):
18+
// - "json"
19+
// - "proto3"
20+
//
21+
// Decoding always support all encodings regardless of this setting.
22+
//
23+
// WARNING: This environment variable should only be used for testing; and never set it in production.
24+
const SerializerDataEncodingEnvVar = "TEMPORAL_TEST_DATA_ENCODING"
25+
26+
// EncodingTypeFromEnv returns an EncodingType based on the environment variable `TEMPORAL_TEST_DATA_ENCODING`.
27+
// It defaults to "ENCODING_TYPE_PROTO3" codec if the environment variable is not set.
28+
func EncodingTypeFromEnv() enumspb.EncodingType {
29+
codecType := os.Getenv(SerializerDataEncodingEnvVar)
30+
switch strings.ToLower(codecType) {
31+
case "", "proto3":
32+
return enumspb.ENCODING_TYPE_PROTO3
33+
case "json":
34+
return enumspb.ENCODING_TYPE_JSON
35+
default:
36+
//nolint:forbidigo // should fail fast and hard if used incorrectly
37+
panic(fmt.Sprintf("unknown codec %q for environment variable %s", codecType, SerializerDataEncodingEnvVar))
38+
}
39+
}
40+
1241
// ProtoEncode is kept for backward compatibility.
1342
func ProtoEncode(m proto.Message) (*commonpb.DataBlob, error) {
1443
return encodeBlob(m, enumspb.ENCODING_TYPE_PROTO3)

common/persistence/serialization/serializer.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@ import (
1818
"google.golang.org/protobuf/proto"
1919
)
2020

21-
// DefaultDecoder is here for convenience to skip the need to create a new Serializer when only decodig is needed.
22-
// It does not need an encoding type; as it will use the one defined in the DataBlob.
23-
var r Serializer = &serializerImpl{encodingType: enumspb.ENCODING_TYPE_UNSPECIFIED}
24-
var DefaultDecoder Decoder = r
21+
var (
22+
// DefaultDecoder is here for convenience to skip the need to create a new Serializer when only decoding is needed.
23+
// It does not need an encoding type; as it will use the one defined in the DataBlob.
24+
defaultSerializer Serializer = &serializerImpl{encodingType: enumspb.ENCODING_TYPE_UNSPECIFIED}
25+
DefaultDecoder Decoder = defaultSerializer
26+
27+
// proto3Encoder always encodes as proto3, used by EnsureProto3Encoding.
28+
proto3Encoder Encoder = &serializerImpl{encodingType: enumspb.ENCODING_TYPE_PROTO3}
29+
)
2530

2631
type (
2732
// Encoder is used to encode objects to DataBlobs.
2833
Encoder interface {
34+
EncodingType() enumspb.EncodingType
2935
SerializeEvents(batch []*historypb.HistoryEvent) (*commonpb.DataBlob, error)
3036
SerializeEvent(event *historypb.HistoryEvent) (*commonpb.DataBlob, error)
3137
SerializeClusterMetadata(icm *persistencespb.ClusterMetadata) (*commonpb.DataBlob, error)
@@ -133,7 +139,11 @@ type (
133139
)
134140

135141
func NewSerializer() Serializer {
136-
return &serializerImpl{encodingType: enumspb.ENCODING_TYPE_PROTO3}
142+
return &serializerImpl{encodingType: EncodingTypeFromEnv()}
143+
}
144+
145+
func (t *serializerImpl) EncodingType() enumspb.EncodingType {
146+
return t.encodingType
137147
}
138148

139149
func (t *serializerImpl) SerializeTask(
@@ -666,3 +676,25 @@ func (t *serializerImpl) QueueStateFromBlob(data *commonpb.DataBlob) (*persisten
666676
result := &persistencespb.QueueState{}
667677
return result, Decode(data, result)
668678
}
679+
680+
// ReencodeEventBlobsAsProto3 re-encodes event blobs as proto3 if the serializer uses a different encoding.
681+
// In production (proto3 encoding), this returns the input unchanged.
682+
func ReencodeEventBlobsAsProto3(serializer Serializer, blobs []*commonpb.DataBlob) ([]*commonpb.DataBlob, error) {
683+
if serializer.EncodingType() == enumspb.ENCODING_TYPE_PROTO3 || len(blobs) == 0 {
684+
return blobs, nil
685+
}
686+
687+
// Re-encode all blobs as proto3.
688+
result := make([]*commonpb.DataBlob, len(blobs))
689+
for i, blob := range blobs {
690+
events, err := serializer.DeserializeEvents(blob)
691+
if err != nil {
692+
return nil, err
693+
}
694+
result[i], err = proto3Encoder.SerializeEvents(events)
695+
if err != nil {
696+
return nil, err
697+
}
698+
}
699+
return result, nil
700+
}

docs/development/testing.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ This document describes the project's testing setup, utilities and best practice
1616
- `TEMPORAL_TEST_OTEL_OUTPUT`: Enables OpenTelemetry (OTEL) trace output for failed tests to the provided file path.
1717
- `TEMPORAL_TEST_SHARED_CLUSTERS`: Number of shared clusters in the pool. Each can be used by multiple tests simultaneously.
1818
- `TEMPORAL_TEST_DEDICATED_CLUSTERS`: Number of dedicated clusters in the pool. Each can be used by one test only at a time.
19+
- `TEMPORAL_TEST_DATA_ENCODING`: Controls the encoding used for persistence DataBlobs. Available options: `proto3` (default) or `json`.
1920

2021
### Debugging via IDE
2122

service/history/api/get_history_util.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ func GetRawHistory(
135135
rawHistory = append(rawHistory, blob)
136136
}
137137
}
138+
139+
// Ensure all raw history is proto3 encoded since data may be stored in other formats during testing.
140+
// In production (proto3 encoding), this returns the input unchanged.
141+
rawHistory, err = serialization.ReencodeEventBlobsAsProto3(shardContext.GetPayloadSerializer(), rawHistory)
142+
if err != nil {
143+
return nil, nil, err
144+
}
145+
138146
return rawHistory, nextToken, nil
139147
}
140148

service/history/api/getworkflowexecutionrawhistory/api.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.temporal.io/server/common/metrics"
1414
"go.temporal.io/server/common/namespace"
1515
"go.temporal.io/server/common/persistence"
16+
"go.temporal.io/server/common/persistence/serialization"
1617
"go.temporal.io/server/common/persistence/versionhistory"
1718
"go.temporal.io/server/common/rpc/interceptor"
1819
"go.temporal.io/server/service/history/api"
@@ -130,9 +131,16 @@ func Invoke(
130131
metrics.OperationTag(metrics.AdminGetWorkflowExecutionRawHistoryScope),
131132
)
132133

134+
// Ensure all raw history is proto3 encoded since data may be stored in other formats during testing.
135+
// In production (proto3 encoding), this returns the input unchanged.
136+
historyBlobs, err := serialization.ReencodeEventBlobsAsProto3(shardContext.GetPayloadSerializer(), rawHistoryResponse.HistoryEventBlobs)
137+
if err != nil {
138+
return nil, err
139+
}
140+
133141
result :=
134142
&adminservice.GetWorkflowExecutionRawHistoryResponse{
135-
HistoryBatches: rawHistoryResponse.HistoryEventBlobs,
143+
HistoryBatches: historyBlobs,
136144
VersionHistory: targetVersionHistory,
137145
HistoryNodeIds: rawHistoryResponse.NodeIDs,
138146
}

service/history/api/getworkflowexecutionrawhistoryv2/api.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.temporal.io/server/common/metrics"
1414
"go.temporal.io/server/common/namespace"
1515
"go.temporal.io/server/common/persistence"
16+
"go.temporal.io/server/common/persistence/serialization"
1617
"go.temporal.io/server/common/persistence/versionhistory"
1718
"go.temporal.io/server/common/rpc/interceptor"
1819
"go.temporal.io/server/service/history/api"
@@ -136,9 +137,16 @@ func Invoke(
136137
metrics.OperationTag(metrics.AdminGetWorkflowExecutionRawHistoryV2Scope),
137138
)
138139

140+
// Ensure all raw history is proto3 encoded since data may be stored in other formats during testing.
141+
// In production (proto3 encoding), this returns the input unchanged.
142+
historyBlobs, err := serialization.ReencodeEventBlobsAsProto3(shardContext.GetPayloadSerializer(), rawHistoryResponse.HistoryEventBlobs)
143+
if err != nil {
144+
return nil, err
145+
}
146+
139147
result :=
140148
&adminservice.GetWorkflowExecutionRawHistoryV2Response{
141-
HistoryBatches: rawHistoryResponse.HistoryEventBlobs,
149+
HistoryBatches: historyBlobs,
142150
VersionHistory: targetVersionHistory,
143151
HistoryNodeIds: rawHistoryResponse.NodeIDs,
144152
}

service/history/configs/config.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,6 @@ type Config struct {
200200
// right now only used by GetMutableState
201201
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithNamespaceFilter
202202

203-
// encoding the history events
204-
EventEncodingType dynamicconfig.StringPropertyFnWithNamespaceFilter
205203
// whether or not using ParentClosePolicy
206204
EnableParentClosePolicy dynamicconfig.BoolPropertyFnWithNamespaceFilter
207205
// whether or not enable system workers for processing parent close policy task
@@ -617,7 +615,6 @@ func NewConfig(
617615
// history client: client/history/client.go set the client timeout 30s
618616
// TODO: Return this value to the client: go.temporal.io/server/issues/294
619617
LongPollExpirationInterval: dynamicconfig.HistoryLongPollExpirationInterval.Get(dc),
620-
EventEncodingType: dynamicconfig.DefaultEventEncoding.Get(dc),
621618
EnableParentClosePolicy: dynamicconfig.EnableParentClosePolicy.Get(dc),
622619
NumParentClosePolicySystemWorkflows: dynamicconfig.NumParentClosePolicySystemWorkflows.Get(dc),
623620
EnableParentClosePolicyWorker: dynamicconfig.EnableParentClosePolicyWorker.Get(dc),

0 commit comments

Comments
 (0)