diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index 41e5aefac09..ef5961c343c 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -2418,6 +2418,12 @@ const ( // LastStringKey must be the last one in this const group LastStringKey + + // SerializationEncoding is the encoding type for blobs + // KeyName: history.serializationEncoding + // Value type: String + // Default value: "thriftrw" + SerializationEncoding ) const ( @@ -4911,6 +4917,11 @@ var StringKeys = map[StringKey]DynamicString{ Description: "MatchingShardDistributionMode defines which shard distribution mode should be used", DefaultValue: "hash_ring", }, + SerializationEncoding: { + KeyName: "history.serializationEncoding", + Description: "SerializationEncoding is the encoding type for blobs", + DefaultValue: string(constants.EncodingTypeThriftRW), + }, } var DurationKeys = map[DurationKey]DynamicDuration{ diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index 5fc976f4df2..a37216c5fad 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -526,12 +526,12 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]} switch { case defaultCfg.NoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig() defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case defaultCfg.ShardedNoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case defaultCfg.SQL != nil: @@ -543,16 +543,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite string(constants.EncodingTypeThriftRW), } } - var decodingTypes []constants.EncodingType - for _, dt := range defaultCfg.SQL.DecodingTypes { - decodingTypes = append(decodingTypes, constants.EncodingType(dt)) - } - defaultDataStore.factory = sql.NewFactory( - *defaultCfg.SQL, - clusterName, - f.logger, - getParser(f.logger, constants.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...), - f.dc) + defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger, f.getParser(), f.dc) default: f.logger.Fatal("invalid config: one of nosql or sql params must be specified for defaultDataStore") } @@ -576,21 +567,12 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]} switch { case visibilityCfg.NoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig() visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case visibilityCfg.SQL != nil: - var decodingTypes []constants.EncodingType - for _, dt := range visibilityCfg.SQL.DecodingTypes { - decodingTypes = append(decodingTypes, constants.EncodingType(dt)) - } - visibilityDataStore.factory = sql.NewFactory( - *visibilityCfg.SQL, - clusterName, - f.logger, - getParser(f.logger, constants.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...), - f.dc) + visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger, f.getParser(), f.dc) default: f.logger.Fatal("invalid config: one of nosql or sql params must be specified for visibilityStore") } @@ -598,10 +580,10 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite f.datastores[storeTypeVisibility] = visibilityDataStore } -func getParser(logger log.Logger, encodingType constants.EncodingType, decodingTypes ...constants.EncodingType) serialization.Parser { - parser, err := serialization.NewParser(encodingType, decodingTypes...) +func (f *factoryImpl) getParser() serialization.Parser { + parser, err := serialization.NewParser(f.dc) if err != nil { - logger.Fatal("failed to construct parser", tag.Error(err)) + f.logger.Fatal("failed to construct parser", tag.Error(err)) } return parser } diff --git a/common/persistence/config.go b/common/persistence/config.go index 5db433e26cb..b8db3c90663 100644 --- a/common/persistence/config.go +++ b/common/persistence/config.go @@ -35,6 +35,7 @@ type ( EnableHistoryTaskDualWriteMode dynamicproperties.BoolPropertyFn ReadNoSQLHistoryTaskFromDataBlob dynamicproperties.BoolPropertyFn ReadNoSQLShardFromDataBlob dynamicproperties.BoolPropertyFn + SerializationEncoding dynamicproperties.StringPropertyFn } ) @@ -48,5 +49,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration EnableHistoryTaskDualWriteMode: dc.GetBoolProperty(dynamicproperties.EnableNoSQLHistoryTaskDualWriteMode), ReadNoSQLHistoryTaskFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLHistoryTaskFromDataBlob), ReadNoSQLShardFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLShardFromDataBlob), + SerializationEncoding: dc.GetStringProperty(dynamicproperties.SerializationEncoding), } } diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index c574508a7a9..113fe3fa31a 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -157,6 +157,7 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := TestBaseParams{ DefaultTestCluster: testCluster, @@ -188,6 +189,7 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := TestBaseParams{ DefaultTestCluster: testCluster, diff --git a/common/persistence/serialization/parser.go b/common/persistence/serialization/parser.go index 2306c10fb91..7f5931f5ecb 100644 --- a/common/persistence/serialization/parser.go +++ b/common/persistence/serialization/parser.go @@ -31,193 +31,295 @@ import ( type ( parser struct { - encoder encoder + dc *persistence.DynamicConfiguration + encoders map[constants.EncodingType]encoder decoders map[constants.EncodingType]decoder } ) +var allBlobEncodings = []constants.EncodingType{ + constants.EncodingTypeThriftRW, + constants.EncodingTypeThriftRWSnappy, +} + // NewParser constructs a new parser using encoder as specified by encodingType and using decoders specified by decodingTypes -func NewParser(encodingType constants.EncodingType, decodingTypes ...constants.EncodingType) (Parser, error) { - encoder, err := getEncoder(encodingType) - if err != nil { - return nil, err - } +func NewParser(dc *persistence.DynamicConfiguration) (Parser, error) { + encoders := make(map[constants.EncodingType]encoder) decoders := make(map[constants.EncodingType]decoder) - for _, dt := range decodingTypes { + + for _, dt := range allBlobEncodings { decoder, err := getDecoder(dt) if err != nil { return nil, err } decoders[dt] = decoder + + encoder, err := getEncoder(dt) + if err != nil { + return nil, err + } + encoders[dt] = encoder } return &parser{ - encoder: encoder, + dc: dc, + encoders: encoders, decoders: decoders, }, nil } func (p *parser) ShardInfoToBlob(info *ShardInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.shardInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) if err != nil { return db, err } + + data, err := encoder.shardInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) DomainInfoToBlob(info *DomainInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.domainInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.domainInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) HistoryTreeInfoToBlob(info *HistoryTreeInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.historyTreeInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.historyTreeInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) WorkflowExecutionInfoToBlob(info *WorkflowExecutionInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.workflowExecutionInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.workflowExecutionInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ActivityInfoToBlob(info *ActivityInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.activityInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.activityInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ChildExecutionInfoToBlob(info *ChildExecutionInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.childExecutionInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.childExecutionInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) SignalInfoToBlob(info *SignalInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.signalInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.signalInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) RequestCancelInfoToBlob(info *RequestCancelInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.requestCancelInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.requestCancelInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TimerInfoToBlob(info *TimerInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.timerInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.timerInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TaskInfoToBlob(info *TaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.taskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.taskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TaskListInfoToBlob(info *TaskListInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.taskListInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.taskListInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TransferTaskInfoToBlob(info *TransferTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.transferTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.transferTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) CrossClusterTaskInfoToBlob(info *CrossClusterTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.crossClusterTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.crossClusterTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TimerTaskInfoToBlob(info *TimerTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.timerTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.timerTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ReplicationTaskInfoToBlob(info *ReplicationTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.replicationTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.replicationTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } @@ -341,6 +443,14 @@ func (p *parser) ReplicationTaskInfoFromBlob(data []byte, encoding string) (*Rep return decoder.replicationTaskInfoFromBlob(data) } +func (p *parser) getCachedEncoder(encoding constants.EncodingType) (encoder, error) { + encoder, ok := p.encoders[encoding] + if !ok { + return nil, unsupportedEncodingError(encoding) + } + return encoder, nil +} + func (p *parser) getCachedDecoder(encoding constants.EncodingType) (decoder, error) { decoder, ok := p.decoders[encoding] if !ok { diff --git a/common/persistence/serialization/parser_test.go b/common/persistence/serialization/parser_test.go index 622bb8ee07e..f2cca2a6945 100644 --- a/common/persistence/serialization/parser_test.go +++ b/common/persistence/serialization/parser_test.go @@ -33,12 +33,16 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) func TestParserRoundTrip(t *testing.T) { - thriftParser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + thriftParser, err := NewParser(dc) assert.NoError(t, err) now := time.Now().Round(time.Second) @@ -355,7 +359,10 @@ func TestParser_WorkflowExecution_with_cron(t *testing.T) { CronSchedule: "@every 1m", IsCron: true, } - parser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := NewParser(dc) require.NoError(t, err) blob, err := parser.WorkflowExecutionInfoToBlob(info) require.NoError(t, err) diff --git a/common/persistence/serialization/snappy_thrift_decoder_test.go b/common/persistence/serialization/snappy_thrift_decoder_test.go index 4f5456d8962..526b4690746 100644 --- a/common/persistence/serialization/snappy_thrift_decoder_test.go +++ b/common/persistence/serialization/snappy_thrift_decoder_test.go @@ -31,10 +31,15 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/persistence" ) func TestSnappyThriftDecoderRoundTrip(t *testing.T) { - parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRWSnappy)), + } + parser, err := NewParser(dc) require.NoError(t, err) testCases := []struct { diff --git a/common/persistence/serialization/snappy_thrift_encoder_test.go b/common/persistence/serialization/snappy_thrift_encoder_test.go index b80efc7ebc4..d5b69a87c95 100644 --- a/common/persistence/serialization/snappy_thrift_encoder_test.go +++ b/common/persistence/serialization/snappy_thrift_encoder_test.go @@ -31,6 +31,8 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/persistence" ) func TestSnappyThriftEncoderRoundTrip(t *testing.T) { @@ -402,8 +404,11 @@ func TestSnappyThriftRWEncode(t *testing.T) { } func TestSnappyThriftEncoderWithParser(t *testing.T) { + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRWSnappy)), + } // Test encoder integration with parser - parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + parser, err := NewParser(dc) require.NoError(t, err) testData := shardInfoTestData diff --git a/common/persistence/serialization/task_serializer_test.go b/common/persistence/serialization/task_serializer_test.go index b077c8b50d5..d462fbdbcd3 100644 --- a/common/persistence/serialization/task_serializer_test.go +++ b/common/persistence/serialization/task_serializer_test.go @@ -30,11 +30,15 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" ) func TestTaskSerializerThriftRW(t *testing.T) { - parser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := NewParser(dc) require.NoError(t, err) taskSerializer := NewTaskSerializer(parser) diff --git a/common/persistence/sql/sql_task_store_test.go b/common/persistence/sql/sql_task_store_test.go index 11d0aa99e02..37cc070712b 100644 --- a/common/persistence/sql/sql_task_store_test.go +++ b/common/persistence/sql/sql_task_store_test.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/serialization" "github.com/uber/cadence/common/persistence/sql/sqlplugin" @@ -1721,7 +1722,10 @@ func TestHelp(t *testing.T) { shard := sqlplugin.GetDBShardIDFromDomainIDAndTasklist("c4b5cb22-c213-4812-bb4a-fc1ade5405ef", "pgtasklist", 16384) println("shard: ", shard) // BgAKAAAKAAwAAAAABQ+kWAoADgAAAAAAAAAACgAQGB6uFsU9cvEMABIKAAoAAAAAAAAAAQgADAAAAAAIAA4AAAAAAAA= - parser, err := serialization.NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := serialization.NewParser(dc) require.NoError(t, err) data, err := base64.StdEncoding.DecodeString("BgAKAAAKAAwAAAAABGGFYAoADgAAAAAAAAAACgAQGB6uGPaVqOUMABIKAAoAAAAAAAAAAQgADAAAAAIIAA4AAAACAAA=") require.NoError(t, err) diff --git a/host/async_wf_test.go b/host/async_wf_test.go index 4bedf7859a3..da19afe614f 100644 --- a/host/async_wf_test.go +++ b/host/async_wf_test.go @@ -52,6 +52,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" pt "github.com/uber/cadence/common/persistence/persistence-tests" @@ -99,6 +100,7 @@ func (s *AsyncWFIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/integrationbase.go b/host/integrationbase.go index 96b11784d93..062716fcaf8 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -117,6 +117,7 @@ func (s *IntegrationBase) setupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/ndc/integration_test.go b/host/ndc/integration_test.go index a197af47298..bb0f98b7924 100644 --- a/host/ndc/integration_test.go +++ b/host/ndc/integration_test.go @@ -103,6 +103,7 @@ func (s *NDCIntegrationTestSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.defaultTestCluster, diff --git a/host/pinot_test.go b/host/pinot_test.go index 45b87c230b2..8f2cff079ec 100644 --- a/host/pinot_test.go +++ b/host/pinot_test.go @@ -56,6 +56,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log" @@ -114,6 +115,7 @@ func (s *PinotIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/workflowidratelimit_test.go b/host/workflowidratelimit_test.go index 61965b86453..55a6b68089e 100644 --- a/host/workflowidratelimit_test.go +++ b/host/workflowidratelimit_test.go @@ -77,6 +77,7 @@ func (s *WorkflowIDRateLimitIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/workflowsidinternalratelimit_test.go b/host/workflowsidinternalratelimit_test.go index ec498851695..eb82a170aa5 100644 --- a/host/workflowsidinternalratelimit_test.go +++ b/host/workflowsidinternalratelimit_test.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -80,6 +81,7 @@ func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/simulation/history/history_simulation_test.go b/simulation/history/history_simulation_test.go index 38ece6f099d..59d1caff925 100644 --- a/simulation/history/history_simulation_test.go +++ b/simulation/history/history_simulation_test.go @@ -17,6 +17,7 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/transport/grpc" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -75,6 +76,7 @@ func (s *HistorySimulationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/simulation/matching/matching_simulation_test.go b/simulation/matching/matching_simulation_test.go index e345c881512..9c37554e0ef 100644 --- a/simulation/matching/matching_simulation_test.go +++ b/simulation/matching/matching_simulation_test.go @@ -168,6 +168,7 @@ func (s *MatchingSimulationSuite) SetupSuite() { PersistenceSampleLoggingRate: dynamicproperties.GetIntPropertyFn(100), EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true), EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster,