Skip to content

Commit dedaaf5

Browse files
authored
Add a feature flag for serialization encoding (#7148)
* Add thrift_snappy endoding * Move test payloads into a common file * Add feature flag for serialization encoding * Fix tests * Fix tests
1 parent 717d835 commit dedaaf5

18 files changed

+214
-70
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2418,6 +2418,12 @@ const (
24182418

24192419
// LastStringKey must be the last one in this const group
24202420
LastStringKey
2421+
2422+
// SerializationEncoding is the encoding type for blobs
2423+
// KeyName: history.serializationEncoding
2424+
// Value type: String
2425+
// Default value: "thriftrw"
2426+
SerializationEncoding
24212427
)
24222428

24232429
const (
@@ -4911,6 +4917,11 @@ var StringKeys = map[StringKey]DynamicString{
49114917
Description: "MatchingShardDistributionMode defines which shard distribution mode should be used",
49124918
DefaultValue: "hash_ring",
49134919
},
4920+
SerializationEncoding: {
4921+
KeyName: "history.serializationEncoding",
4922+
Description: "SerializationEncoding is the encoding type for blobs",
4923+
DefaultValue: string(constants.EncodingTypeThriftRW),
4924+
},
49144925
}
49154926

49164927
var DurationKeys = map[DurationKey]DynamicDuration{

common/persistence/client/factory.go

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -526,12 +526,12 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
526526
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
527527
switch {
528528
case defaultCfg.NoSQL != nil:
529-
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
529+
parser := f.getParser()
530530
taskSerializer := serialization.NewTaskSerializer(parser)
531531
shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig()
532532
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
533533
case defaultCfg.ShardedNoSQL != nil:
534-
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
534+
parser := f.getParser()
535535
taskSerializer := serialization.NewTaskSerializer(parser)
536536
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
537537
case defaultCfg.SQL != nil:
@@ -543,16 +543,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
543543
string(constants.EncodingTypeThriftRW),
544544
}
545545
}
546-
var decodingTypes []constants.EncodingType
547-
for _, dt := range defaultCfg.SQL.DecodingTypes {
548-
decodingTypes = append(decodingTypes, constants.EncodingType(dt))
549-
}
550-
defaultDataStore.factory = sql.NewFactory(
551-
*defaultCfg.SQL,
552-
clusterName,
553-
f.logger,
554-
getParser(f.logger, constants.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...),
555-
f.dc)
546+
defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger, f.getParser(), f.dc)
556547
default:
557548
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for defaultDataStore")
558549
}
@@ -576,32 +567,23 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
576567
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
577568
switch {
578569
case visibilityCfg.NoSQL != nil:
579-
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
570+
parser := f.getParser()
580571
taskSerializer := serialization.NewTaskSerializer(parser)
581572
shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig()
582573
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
583574
case visibilityCfg.SQL != nil:
584-
var decodingTypes []constants.EncodingType
585-
for _, dt := range visibilityCfg.SQL.DecodingTypes {
586-
decodingTypes = append(decodingTypes, constants.EncodingType(dt))
587-
}
588-
visibilityDataStore.factory = sql.NewFactory(
589-
*visibilityCfg.SQL,
590-
clusterName,
591-
f.logger,
592-
getParser(f.logger, constants.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...),
593-
f.dc)
575+
visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger, f.getParser(), f.dc)
594576
default:
595577
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for visibilityStore")
596578
}
597579

598580
f.datastores[storeTypeVisibility] = visibilityDataStore
599581
}
600582

601-
func getParser(logger log.Logger, encodingType constants.EncodingType, decodingTypes ...constants.EncodingType) serialization.Parser {
602-
parser, err := serialization.NewParser(encodingType, decodingTypes...)
583+
func (f *factoryImpl) getParser() serialization.Parser {
584+
parser, err := serialization.NewParser(f.dc)
603585
if err != nil {
604-
logger.Fatal("failed to construct parser", tag.Error(err))
586+
f.logger.Fatal("failed to construct parser", tag.Error(err))
605587
}
606588
return parser
607589
}

common/persistence/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type (
3535
EnableHistoryTaskDualWriteMode dynamicproperties.BoolPropertyFn
3636
ReadNoSQLHistoryTaskFromDataBlob dynamicproperties.BoolPropertyFn
3737
ReadNoSQLShardFromDataBlob dynamicproperties.BoolPropertyFn
38+
SerializationEncoding dynamicproperties.StringPropertyFn
3839
}
3940
)
4041

@@ -48,5 +49,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
4849
EnableHistoryTaskDualWriteMode: dc.GetBoolProperty(dynamicproperties.EnableNoSQLHistoryTaskDualWriteMode),
4950
ReadNoSQLHistoryTaskFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLHistoryTaskFromDataBlob),
5051
ReadNoSQLShardFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLShardFromDataBlob),
52+
SerializationEncoding: dc.GetStringProperty(dynamicproperties.SerializationEncoding),
5153
}
5254
}

common/persistence/persistence-tests/persistenceTestBase.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase {
157157
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
158158
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
159159
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
160+
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
160161
}
161162
params := TestBaseParams{
162163
DefaultTestCluster: testCluster,
@@ -188,6 +189,7 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase {
188189
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
189190
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
190191
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
192+
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
191193
}
192194
params := TestBaseParams{
193195
DefaultTestCluster: testCluster,

0 commit comments

Comments
 (0)