From 6cc561a703ac7b45b5e6b0f1c989708d7d885f03 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Wed, 23 Jul 2025 13:14:28 +0200 Subject: [PATCH 1/5] Add thrift_snappy endoding --- common/constants/constants.go | 13 +- common/persistence/data_store_interfaces.go | 2 + common/persistence/serialization/parser.go | 4 + .../serialization/snappy_thrift_decoder.go | 169 +++++ .../snappy_thrift_decoder_test.go | 611 +++++++++++++++ .../serialization/snappy_thrift_encoder.go | 113 +++ .../snappy_thrift_encoder_test.go | 709 ++++++++++++++++++ go.mod | 2 +- 8 files changed, 1616 insertions(+), 7 deletions(-) create mode 100644 common/persistence/serialization/snappy_thrift_decoder.go create mode 100644 common/persistence/serialization/snappy_thrift_decoder_test.go create mode 100644 common/persistence/serialization/snappy_thrift_encoder.go create mode 100644 common/persistence/serialization/snappy_thrift_encoder_test.go diff --git a/common/constants/constants.go b/common/constants/constants.go index 2d9dffaf5e6..8ff69575261 100644 --- a/common/constants/constants.go +++ b/common/constants/constants.go @@ -60,12 +60,13 @@ const ( // Data encoding types const ( - EncodingTypeJSON EncodingType = "json" - EncodingTypeThriftRW EncodingType = "thriftrw" - EncodingTypeGob EncodingType = "gob" - EncodingTypeUnknown EncodingType = "unknow" - EncodingTypeEmpty EncodingType = "" - EncodingTypeProto EncodingType = "proto3" + EncodingTypeJSON EncodingType = "json" + EncodingTypeThriftRW EncodingType = "thriftrw" + EncodingTypeThriftRWSnappy EncodingType = "thriftrw_snappy" + EncodingTypeGob EncodingType = "gob" + EncodingTypeUnknown EncodingType = "unknow" + EncodingTypeEmpty EncodingType = "" + EncodingTypeProto EncodingType = "proto3" ) type ( diff --git a/common/persistence/data_store_interfaces.go b/common/persistence/data_store_interfaces.go index e184b7772a8..5aebf63c8fc 100644 --- a/common/persistence/data_store_interfaces.go +++ b/common/persistence/data_store_interfaces.go @@ -965,6 +965,8 @@ func (d *DataBlob) GetEncoding() constants.EncodingType { return constants.EncodingTypeJSON case constants.EncodingTypeThriftRW: return constants.EncodingTypeThriftRW + case constants.EncodingTypeThriftRWSnappy: + return constants.EncodingTypeThriftRWSnappy case constants.EncodingTypeEmpty: return constants.EncodingTypeEmpty default: diff --git a/common/persistence/serialization/parser.go b/common/persistence/serialization/parser.go index 27eb47a85da..2306c10fb91 100644 --- a/common/persistence/serialization/parser.go +++ b/common/persistence/serialization/parser.go @@ -353,6 +353,8 @@ func getDecoder(encoding constants.EncodingType) (decoder, error) { switch encoding { case constants.EncodingTypeThriftRW: return newThriftDecoder(), nil + case constants.EncodingTypeThriftRWSnappy: + return newSnappyThriftDecoder(), nil default: return nil, unsupportedEncodingError(encoding) } @@ -362,6 +364,8 @@ func getEncoder(encoding constants.EncodingType) (encoder, error) { switch encoding { case constants.EncodingTypeThriftRW: return newThriftEncoder(), nil + case constants.EncodingTypeThriftRWSnappy: + return newSnappyThriftEncoder(), nil default: return nil, unsupportedEncodingError(encoding) } diff --git a/common/persistence/serialization/snappy_thrift_decoder.go b/common/persistence/serialization/snappy_thrift_decoder.go new file mode 100644 index 00000000000..bfcc283516d --- /dev/null +++ b/common/persistence/serialization/snappy_thrift_decoder.go @@ -0,0 +1,169 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "bytes" + + "github.com/golang/snappy" + "go.uber.org/thriftrw/protocol/binary" + + "github.com/uber/cadence/.gen/go/sqlblobs" +) + +type snappyThriftDecoder struct{} + +func newSnappyThriftDecoder() decoder { + return &snappyThriftDecoder{} +} + +func (d *snappyThriftDecoder) shardInfoFromBlob(data []byte) (*ShardInfo, error) { + result := &sqlblobs.ShardInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return shardInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) domainInfoFromBlob(data []byte) (*DomainInfo, error) { + result := &sqlblobs.DomainInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return domainInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) historyTreeInfoFromBlob(data []byte) (*HistoryTreeInfo, error) { + result := &sqlblobs.HistoryTreeInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return historyTreeInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) workflowExecutionInfoFromBlob(data []byte) (*WorkflowExecutionInfo, error) { + result := &sqlblobs.WorkflowExecutionInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return workflowExecutionInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) activityInfoFromBlob(data []byte) (*ActivityInfo, error) { + result := &sqlblobs.ActivityInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return activityInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) childExecutionInfoFromBlob(data []byte) (*ChildExecutionInfo, error) { + result := &sqlblobs.ChildExecutionInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return childExecutionInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) signalInfoFromBlob(data []byte) (*SignalInfo, error) { + result := &sqlblobs.SignalInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return signalInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) requestCancelInfoFromBlob(data []byte) (*RequestCancelInfo, error) { + result := &sqlblobs.RequestCancelInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return requestCancelInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) timerInfoFromBlob(data []byte) (*TimerInfo, error) { + result := &sqlblobs.TimerInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return timerInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) taskInfoFromBlob(data []byte) (*TaskInfo, error) { + result := &sqlblobs.TaskInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return taskInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) taskListInfoFromBlob(data []byte) (*TaskListInfo, error) { + result := &sqlblobs.TaskListInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return taskListInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) transferTaskInfoFromBlob(data []byte) (*TransferTaskInfo, error) { + result := &sqlblobs.TransferTaskInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return transferTaskInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) crossClusterTaskInfoFromBlob(data []byte) (*CrossClusterTaskInfo, error) { + result := &sqlblobs.TransferTaskInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return crossClusterTaskInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) timerTaskInfoFromBlob(data []byte) (*TimerTaskInfo, error) { + result := &sqlblobs.TimerTaskInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return timerTaskInfoFromThrift(result), nil +} + +func (d *snappyThriftDecoder) replicationTaskInfoFromBlob(data []byte) (*ReplicationTaskInfo, error) { + result := &sqlblobs.ReplicationTaskInfo{} + if err := snappyThriftRWDecode(data, result); err != nil { + return nil, err + } + return replicationTaskInfoFromThrift(result), nil +} + +func snappyThriftRWDecode(b []byte, result thriftRWType) error { + decompressed, err := snappy.Decode(nil, b) + if err != nil { + return err + } + + buf := bytes.NewReader(decompressed) + sr := binary.Default.Reader(buf) + return result.Decode(sr) +} diff --git a/common/persistence/serialization/snappy_thrift_decoder_test.go b/common/persistence/serialization/snappy_thrift_decoder_test.go new file mode 100644 index 00000000000..3867ea27c67 --- /dev/null +++ b/common/persistence/serialization/snappy_thrift_decoder_test.go @@ -0,0 +1,611 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "testing" + "time" + + "github.com/golang/snappy" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/types" +) + +func TestSnappyThriftDecoderRoundTrip(t *testing.T) { + parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + require.NoError(t, err) + + now := time.Now().Round(time.Second) + + testCases := []struct { + name string + data interface{} + }{ + { + name: "ShardInfo", + data: &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: now, + ReplicationAckLevel: 1, + TransferAckLevel: 1, + TimerAckLevel: now, + DomainNotificationVersion: 1, + ClusterTransferAckLevel: map[string]int64{"test": 1}, + ClusterTimerAckLevel: map[string]time.Time{"test": now}, + TransferProcessingQueueStates: []byte{1, 2, 3}, + TimerProcessingQueueStates: []byte{1, 2, 3}, + Owner: "owner", + ClusterReplicationLevel: map[string]int64{"test": 1}, + PendingFailoverMarkers: []byte{2, 3, 4}, + PendingFailoverMarkersEncoding: "", + TransferProcessingQueueStatesEncoding: "", + TimerProcessingQueueStatesEncoding: "", + }, + }, + { + name: "DomainInfo", + data: &DomainInfo{ + Name: "test", + Description: "test_desc", + Owner: "test_owner", + Status: 1, + Retention: 48 * time.Hour, + EmitMetric: true, + ArchivalBucket: "test_bucket", + ArchivalStatus: 1, + ConfigVersion: 1, + FailoverVersion: 1, + NotificationVersion: 1, + FailoverNotificationVersion: 1, + ActiveClusterName: "test_active_cluster", + Clusters: []string{"test_active_cluster", "test_standby_cluster"}, + Data: map[string]string{"test_key": "test_value"}, + BadBinaries: []byte{1, 2, 3}, + BadBinariesEncoding: "", + HistoryArchivalStatus: 1, + HistoryArchivalURI: "test_history_archival_uri", + VisibilityArchivalStatus: 1, + VisibilityArchivalURI: "test_visibility_archival_uri", + }, + }, + { + name: "HistoryTreeInfo", + data: &HistoryTreeInfo{ + CreatedTimestamp: now, + Ancestors: []*types.HistoryBranchRange{ + { + BranchID: "test_branch_id1", + }, + { + BranchID: "test_branch_id2", + }, + }, + Info: "test_info", + }, + }, + { + name: "WorkflowExecutionInfo", + data: &WorkflowExecutionInfo{ + ParentDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ParentWorkflowID: "test_parent_workflow_id", + ParentRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + InitiatedID: 1, + CompletionEventBatchID: common.Int64Ptr(2), + CompletionEvent: []byte("test_completion_event"), + CompletionEventEncoding: "test_completion_event_encoding", + TaskList: "test_task_list", + WorkflowTypeName: "test_workflow_type", + WorkflowTimeout: 10 * time.Second, + DecisionTaskTimeout: 5 * time.Second, + ExecutionContext: []byte("test_execution_context"), + State: 1, + CloseStatus: 1, + StartVersion: 1, + LastWriteEventID: common.Int64Ptr(3), + LastEventTaskID: 4, + LastFirstEventID: 5, + LastProcessedEvent: 6, + StartTimestamp: now, + LastUpdatedTimestamp: now, + CreateRequestID: "test_create_request_id", + DecisionVersion: 7, + DecisionScheduleID: 8, + DecisionStartedID: 9, + DecisionRequestID: "test_decision_request_id", + DecisionTimeout: 3 * time.Second, + DecisionAttempt: 10, + DecisionStartedTimestamp: now, + DecisionScheduledTimestamp: now, + DecisionOriginalScheduledTimestamp: now, + CancelRequested: true, + CancelRequestID: "test_cancel_request_id", + StickyTaskList: "test_sticky_task_list", + StickyScheduleToStartTimeout: 2 * time.Second, + RetryAttempt: 11, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpiration: time.Hour, + RetryBackoffCoefficient: 2.0, + RetryExpirationTimestamp: now, + RetryNonRetryableErrors: []string{"test_error"}, + HasRetryPolicy: true, + CronSchedule: "test_cron", + IsCron: true, + EventStoreVersion: 12, + EventBranchToken: []byte("test_branch_token"), + SignalCount: 13, + HistorySize: 14, + ClientLibraryVersion: "test_client_version", + ClientFeatureVersion: "test_feature_version", + ClientImpl: "test_client_impl", + AutoResetPoints: []byte("test_reset_points"), + AutoResetPointsEncoding: "test_reset_points_encoding", + SearchAttributes: map[string][]byte{"test_key": []byte("test_value")}, + Memo: map[string][]byte{"test_memo": []byte("test_memo_value")}, + VersionHistories: []byte("test_version_histories"), + VersionHistoriesEncoding: "test_version_histories_encoding", + FirstExecutionRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + }, + }, + { + name: "ActivityInfo", + data: &ActivityInfo{ + Version: 1, + ScheduledEventBatchID: 2, + ScheduledEvent: []byte("test_scheduled_event"), + ScheduledEventEncoding: "test_scheduled_encoding", + ScheduledTimestamp: now, + StartedID: 3, + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + StartedTimestamp: now, + ActivityID: "test_activity_id", + RequestID: "test_request_id", + ScheduleToStartTimeout: 5 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 8 * time.Second, + HeartbeatTimeout: 3 * time.Second, + CancelRequested: true, + CancelRequestID: 4, + TimerTaskStatus: 5, + Attempt: 6, + TaskList: "test_task_list", + StartedIdentity: "test_identity", + HasRetryPolicy: true, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpirationTimestamp: now, + RetryBackoffCoefficient: 2.0, + RetryNonRetryableErrors: []string{"test_error"}, + RetryLastFailureReason: "test_failure_reason", + RetryLastWorkerIdentity: "test_worker_identity", + RetryLastFailureDetails: []byte("test_failure_details"), + }, + }, + { + name: "ChildExecutionInfo", + data: &ChildExecutionInfo{ + Version: 1, + InitiatedEventBatchID: 2, + StartedID: 3, + InitiatedEvent: []byte("test_initiated_event"), + InitiatedEventEncoding: "test_initiated_encoding", + StartedWorkflowID: "test_started_workflow_id", + StartedRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + CreateRequestID: "test_create_request_id", + DomainID: "test_domain_id", + DomainNameDEPRECATED: "test_domain_name", + WorkflowTypeName: "test_workflow_type", + ParentClosePolicy: 4, + }, + }, + { + name: "SignalInfo", + data: &SignalInfo{ + Version: 1, + InitiatedEventBatchID: 2, + RequestID: "test_request_id", + Name: "test_signal_name", + Input: []byte("test_input"), + Control: []byte("test_control"), + }, + }, + { + name: "RequestCancelInfo", + data: &RequestCancelInfo{ + Version: 1, + InitiatedEventBatchID: 2, + CancelRequestID: "test_cancel_request_id", + }, + }, + { + name: "TimerInfo", + data: &TimerInfo{ + Version: 1, + StartedID: 2, + ExpiryTimestamp: now, + TaskID: 3, + }, + }, + { + name: "TaskInfo", + data: &TaskInfo{ + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ScheduleID: 1, + ExpiryTimestamp: now, + CreatedTimestamp: now, + PartitionConfig: map[string]string{"test_key": "test_value"}, + }, + }, + { + name: "TaskListInfo", + data: &TaskListInfo{ + Kind: 1, + AckLevel: 2, + ExpiryTimestamp: now, + LastUpdated: now, + }, + }, + { + name: "TransferTaskInfo", + data: &TransferTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TargetDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TargetDomainIDs: []UUID{MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")}, + TargetWorkflowID: "test_target_workflow_id", + TargetRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskList: "test_task_list", + TargetChildWorkflowOnly: true, + ScheduleID: 2, + Version: 3, + VisibilityTimestamp: now, + }, + }, + { + name: "TimerTaskInfo", + data: &TimerTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TimeoutType: common.Int16Ptr(2), + Version: 3, + ScheduleAttempt: 4, + EventID: 5, + }, + }, + { + name: "ReplicationTaskInfo", + data: &ReplicationTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + Version: 2, + FirstEventID: 3, + NextEventID: 4, + ScheduledID: 5, + EventStoreVersion: 6, + NewRunEventStoreVersion: 7, + BranchToken: []byte("test_branch_token"), + NewRunBranchToken: []byte("test_new_run_branch_token"), + CreationTimestamp: now, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Encode the data using the parser + blob := encodeWithParser(t, parser, tc.data) + + // Decode the data using the parser and verify it matches + decoded := decodeWithParser(t, parser, blob, tc.data) + assert.Equal(t, tc.data, decoded) + }) + } +} + +func TestSnappyThriftDecoderErrorHandling(t *testing.T) { + decoder := newSnappyThriftDecoder() + + testCases := []struct { + name string + data []byte + decodeFunc func([]byte) (interface{}, error) + expectError bool + }{ + { + name: "Invalid snappy data for ShardInfo", + data: []byte("invalid snappy data"), + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.shardInfoFromBlob(data) + }, + expectError: true, + }, + { + name: "Empty data for DomainInfo", + data: []byte{}, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.domainInfoFromBlob(data) + }, + expectError: true, + }, + { + name: "Corrupted snappy data for ActivityInfo", + data: []byte{0xff, 0xff, 0xff, 0xff}, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.activityInfoFromBlob(data) + }, + expectError: true, + }, + { + name: "Valid snappy but invalid thrift for WorkflowExecutionInfo", + data: func() []byte { + // Create valid snappy compressed data but with invalid thrift content + invalidThrift := []byte("not thrift data") + compressed := snappy.Encode(nil, invalidThrift) + return compressed + }(), + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.workflowExecutionInfoFromBlob(data) + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result, err := tc.decodeFunc(tc.data) + if tc.expectError { + assert.Error(t, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.NotNil(t, result) + } + }) + } +} + +func TestSnappyThriftRWDecode(t *testing.T) { + // Test the low-level snappyThriftRWDecode function directly + + t.Run("Valid data", func(t *testing.T) { + // Create a simple thrift struct and encode it + encoder := newSnappyThriftEncoder() + shardInfo := &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: time.Now(), + ReplicationAckLevel: 2, + TransferAckLevel: 3, + } + + encoded, err := encoder.shardInfoToBlob(shardInfo) + require.NoError(t, err) + + // Now decode it back + decoder := newSnappyThriftDecoder() + decoded, err := decoder.shardInfoFromBlob(encoded) + require.NoError(t, err) + + assert.Equal(t, shardInfo.StolenSinceRenew, decoded.StolenSinceRenew) + assert.Equal(t, shardInfo.ReplicationAckLevel, decoded.ReplicationAckLevel) + assert.Equal(t, shardInfo.TransferAckLevel, decoded.TransferAckLevel) + }) + + t.Run("Invalid snappy compression", func(t *testing.T) { + decoder := newSnappyThriftDecoder() + + // Test with invalid snappy data + invalidData := []byte("this is not snappy compressed data") + _, err := decoder.shardInfoFromBlob(invalidData) + assert.Error(t, err) + assert.Contains(t, err.Error(), "snappy") + }) + + t.Run("Valid snappy but invalid thrift", func(t *testing.T) { + decoder := newSnappyThriftDecoder() + + // Create valid snappy data but with invalid thrift content + invalidThrift := []byte("not a valid thrift message") + compressed := snappy.Encode(nil, invalidThrift) + + _, err := decoder.shardInfoFromBlob(compressed) + assert.Error(t, err) + }) +} + +func TestSnappyThriftDecoderInterface(t *testing.T) { + // Verify that snappyThriftDecoder implements the decoder interface + var _ decoder = (*snappyThriftDecoder)(nil) + + // Test that newSnappyThriftDecoder returns a valid decoder + decoder := newSnappyThriftDecoder() + assert.NotNil(t, decoder) + assert.IsType(t, &snappyThriftDecoder{}, decoder) +} + +func TestSnappyThriftDecoderNilHandling(t *testing.T) { + decoder := newSnappyThriftDecoder() + + // Test each method with nil data + testCases := []struct { + name string + decodeFunc func() (interface{}, error) + }{ + { + name: "shardInfoFromBlob with nil", + decodeFunc: func() (interface{}, error) { + return decoder.shardInfoFromBlob(nil) + }, + }, + { + name: "domainInfoFromBlob with nil", + decodeFunc: func() (interface{}, error) { + return decoder.domainInfoFromBlob(nil) + }, + }, + { + name: "activityInfoFromBlob with nil", + decodeFunc: func() (interface{}, error) { + return decoder.activityInfoFromBlob(nil) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result, err := tc.decodeFunc() + assert.Error(t, err) + assert.Nil(t, result) + }) + } +} + +// Helper functions for encoding and decoding with parser +func encodeWithParser(t *testing.T, parser Parser, data interface{}) []byte { + var blob []byte + var err error + + switch v := data.(type) { + case *ShardInfo: + db, e := parser.ShardInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *DomainInfo: + db, e := parser.DomainInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *HistoryTreeInfo: + db, e := parser.HistoryTreeInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *WorkflowExecutionInfo: + db, e := parser.WorkflowExecutionInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *ActivityInfo: + db, e := parser.ActivityInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *ChildExecutionInfo: + db, e := parser.ChildExecutionInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *SignalInfo: + db, e := parser.SignalInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *RequestCancelInfo: + db, e := parser.RequestCancelInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *TimerInfo: + db, e := parser.TimerInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *TaskInfo: + db, e := parser.TaskInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *TaskListInfo: + db, e := parser.TaskListInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *TransferTaskInfo: + db, e := parser.TransferTaskInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *TimerTaskInfo: + db, e := parser.TimerTaskInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + case *ReplicationTaskInfo: + db, e := parser.ReplicationTaskInfoToBlob(v) + require.NoError(t, e) + blob = db.Data + default: + t.Fatalf("Unknown type %T", v) + } + + require.NoError(t, err) + require.NotEmpty(t, blob) + return blob +} + +func decodeWithParser(t *testing.T, parser Parser, blob []byte, data interface{}) interface{} { + var result interface{} + var err error + + encoding := string(constants.EncodingTypeThriftRWSnappy) + + switch data.(type) { + case *ShardInfo: + result, err = parser.ShardInfoFromBlob(blob, encoding) + case *DomainInfo: + result, err = parser.DomainInfoFromBlob(blob, encoding) + case *HistoryTreeInfo: + result, err = parser.HistoryTreeInfoFromBlob(blob, encoding) + case *WorkflowExecutionInfo: + result, err = parser.WorkflowExecutionInfoFromBlob(blob, encoding) + case *ActivityInfo: + result, err = parser.ActivityInfoFromBlob(blob, encoding) + case *ChildExecutionInfo: + result, err = parser.ChildExecutionInfoFromBlob(blob, encoding) + case *SignalInfo: + result, err = parser.SignalInfoFromBlob(blob, encoding) + case *RequestCancelInfo: + result, err = parser.RequestCancelInfoFromBlob(blob, encoding) + case *TimerInfo: + result, err = parser.TimerInfoFromBlob(blob, encoding) + case *TaskInfo: + result, err = parser.TaskInfoFromBlob(blob, encoding) + case *TaskListInfo: + result, err = parser.TaskListInfoFromBlob(blob, encoding) + case *TransferTaskInfo: + result, err = parser.TransferTaskInfoFromBlob(blob, encoding) + case *TimerTaskInfo: + result, err = parser.TimerTaskInfoFromBlob(blob, encoding) + case *ReplicationTaskInfo: + result, err = parser.ReplicationTaskInfoFromBlob(blob, encoding) + default: + t.Fatalf("Unknown type %T", data) + } + + require.NoError(t, err) + require.NotNil(t, result) + return result +} diff --git a/common/persistence/serialization/snappy_thrift_encoder.go b/common/persistence/serialization/snappy_thrift_encoder.go new file mode 100644 index 00000000000..49893d6e794 --- /dev/null +++ b/common/persistence/serialization/snappy_thrift_encoder.go @@ -0,0 +1,113 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "bytes" + + "github.com/golang/snappy" + "go.uber.org/thriftrw/protocol/binary" + + "github.com/uber/cadence/common/constants" +) + +type snappyThriftEncoder struct{} + +func newSnappyThriftEncoder() encoder { + return &snappyThriftEncoder{} +} + +func (e *snappyThriftEncoder) shardInfoToBlob(info *ShardInfo) ([]byte, error) { + return snappyThriftRWEncode(shardInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) domainInfoToBlob(info *DomainInfo) ([]byte, error) { + return snappyThriftRWEncode(domainInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) historyTreeInfoToBlob(info *HistoryTreeInfo) ([]byte, error) { + return snappyThriftRWEncode(historyTreeInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) workflowExecutionInfoToBlob(info *WorkflowExecutionInfo) ([]byte, error) { + return snappyThriftRWEncode(workflowExecutionInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) activityInfoToBlob(info *ActivityInfo) ([]byte, error) { + return snappyThriftRWEncode(activityInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) childExecutionInfoToBlob(info *ChildExecutionInfo) ([]byte, error) { + return snappyThriftRWEncode(childExecutionInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) signalInfoToBlob(info *SignalInfo) ([]byte, error) { + return snappyThriftRWEncode(signalInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) requestCancelInfoToBlob(info *RequestCancelInfo) ([]byte, error) { + return snappyThriftRWEncode(requestCancelInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) timerInfoToBlob(info *TimerInfo) ([]byte, error) { + return snappyThriftRWEncode(timerInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) taskInfoToBlob(info *TaskInfo) ([]byte, error) { + return snappyThriftRWEncode(taskInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) taskListInfoToBlob(info *TaskListInfo) ([]byte, error) { + return snappyThriftRWEncode(taskListInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) transferTaskInfoToBlob(info *TransferTaskInfo) ([]byte, error) { + return snappyThriftRWEncode(transferTaskInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) crossClusterTaskInfoToBlob(info *CrossClusterTaskInfo) ([]byte, error) { + return snappyThriftRWEncode(crossClusterTaskInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) timerTaskInfoToBlob(info *TimerTaskInfo) ([]byte, error) { + return snappyThriftRWEncode(timerTaskInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) replicationTaskInfoToBlob(info *ReplicationTaskInfo) ([]byte, error) { + return snappyThriftRWEncode(replicationTaskInfoToThrift(info)) +} + +func (e *snappyThriftEncoder) encodingType() constants.EncodingType { + return constants.EncodingTypeThriftRWSnappy +} + +func snappyThriftRWEncode(t thriftRWType) ([]byte, error) { + var b bytes.Buffer + sw := binary.Default.Writer(&b) + defer sw.Close() + if err := t.Encode(sw); err != nil { + return nil, err + } + + return snappy.Encode(nil, b.Bytes()), nil +} diff --git a/common/persistence/serialization/snappy_thrift_encoder_test.go b/common/persistence/serialization/snappy_thrift_encoder_test.go new file mode 100644 index 00000000000..581e56b2055 --- /dev/null +++ b/common/persistence/serialization/snappy_thrift_encoder_test.go @@ -0,0 +1,709 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "fmt" + "testing" + "time" + + "github.com/golang/snappy" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/types" +) + +func TestSnappyThriftEncoderRoundTrip(t *testing.T) { + encoder := newSnappyThriftEncoder() + decoder := newSnappyThriftDecoder() + + now := time.Now().Round(time.Second) + + testCases := []struct { + name string + data interface{} + encodeFunc func(interface{}) ([]byte, error) + decodeFunc func([]byte) (interface{}, error) + }{ + { + name: "ShardInfo", + data: &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: now, + ReplicationAckLevel: 1, + TransferAckLevel: 1, + TimerAckLevel: now, + DomainNotificationVersion: 1, + ClusterTransferAckLevel: map[string]int64{"test": 1}, + ClusterTimerAckLevel: map[string]time.Time{"test": now}, + TransferProcessingQueueStates: []byte{1, 2, 3}, + TimerProcessingQueueStates: []byte{1, 2, 3}, + Owner: "owner", + ClusterReplicationLevel: map[string]int64{"test": 1}, + PendingFailoverMarkers: []byte{2, 3, 4}, + PendingFailoverMarkersEncoding: "", + TransferProcessingQueueStatesEncoding: "", + TimerProcessingQueueStatesEncoding: "", + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.shardInfoToBlob(data.(*ShardInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.shardInfoFromBlob(data) + }, + }, + { + name: "DomainInfo", + data: &DomainInfo{ + Name: "test", + Description: "test_desc", + Owner: "test_owner", + Status: 1, + Retention: 48 * time.Hour, + EmitMetric: true, + ArchivalBucket: "test_bucket", + ArchivalStatus: 1, + ConfigVersion: 1, + FailoverVersion: 1, + NotificationVersion: 1, + FailoverNotificationVersion: 1, + ActiveClusterName: "test_active_cluster", + Clusters: []string{"test_active_cluster", "test_standby_cluster"}, + Data: map[string]string{"test_key": "test_value"}, + BadBinaries: []byte{1, 2, 3}, + BadBinariesEncoding: "", + HistoryArchivalStatus: 1, + HistoryArchivalURI: "test_history_archival_uri", + VisibilityArchivalStatus: 1, + VisibilityArchivalURI: "test_visibility_archival_uri", + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.domainInfoToBlob(data.(*DomainInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.domainInfoFromBlob(data) + }, + }, + { + name: "HistoryTreeInfo", + data: &HistoryTreeInfo{ + CreatedTimestamp: now, + Ancestors: []*types.HistoryBranchRange{ + { + BranchID: "test_branch_id1", + }, + { + BranchID: "test_branch_id2", + }, + }, + Info: "test_info", + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.historyTreeInfoToBlob(data.(*HistoryTreeInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.historyTreeInfoFromBlob(data) + }, + }, + { + name: "WorkflowExecutionInfo", + data: &WorkflowExecutionInfo{ + ParentDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ParentWorkflowID: "test_parent_workflow_id", + ParentRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + InitiatedID: 1, + CompletionEventBatchID: common.Int64Ptr(2), + CompletionEvent: []byte("test_completion_event"), + CompletionEventEncoding: "test_completion_event_encoding", + TaskList: "test_task_list", + WorkflowTypeName: "test_workflow_type", + WorkflowTimeout: 10 * time.Second, + DecisionTaskTimeout: 5 * time.Second, + ExecutionContext: []byte("test_execution_context"), + State: 1, + CloseStatus: 1, + StartVersion: 1, + LastWriteEventID: common.Int64Ptr(3), + LastEventTaskID: 4, + LastFirstEventID: 5, + LastProcessedEvent: 6, + StartTimestamp: now, + LastUpdatedTimestamp: now, + CreateRequestID: "test_create_request_id", + DecisionVersion: 7, + DecisionScheduleID: 8, + DecisionStartedID: 9, + DecisionRequestID: "test_decision_request_id", + DecisionTimeout: 3 * time.Second, + DecisionAttempt: 10, + DecisionStartedTimestamp: now, + DecisionScheduledTimestamp: now, + DecisionOriginalScheduledTimestamp: now, + CancelRequested: true, + CancelRequestID: "test_cancel_request_id", + StickyTaskList: "test_sticky_task_list", + StickyScheduleToStartTimeout: 2 * time.Second, + RetryAttempt: 11, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpiration: time.Hour, + RetryBackoffCoefficient: 2.0, + RetryExpirationTimestamp: now, + RetryNonRetryableErrors: []string{"test_error"}, + HasRetryPolicy: true, + CronSchedule: "test_cron", + IsCron: true, + EventStoreVersion: 12, + EventBranchToken: []byte("test_branch_token"), + SignalCount: 13, + HistorySize: 14, + ClientLibraryVersion: "test_client_version", + ClientFeatureVersion: "test_feature_version", + ClientImpl: "test_client_impl", + AutoResetPoints: []byte("test_reset_points"), + AutoResetPointsEncoding: "test_reset_points_encoding", + SearchAttributes: map[string][]byte{"test_key": []byte("test_value")}, + Memo: map[string][]byte{"test_memo": []byte("test_memo_value")}, + VersionHistories: []byte("test_version_histories"), + VersionHistoriesEncoding: "test_version_histories_encoding", + FirstExecutionRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.workflowExecutionInfoToBlob(data.(*WorkflowExecutionInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.workflowExecutionInfoFromBlob(data) + }, + }, + { + name: "ActivityInfo", + data: &ActivityInfo{ + Version: 1, + ScheduledEventBatchID: 2, + ScheduledEvent: []byte("test_scheduled_event"), + ScheduledEventEncoding: "test_scheduled_encoding", + ScheduledTimestamp: now, + StartedID: 3, + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + StartedTimestamp: now, + ActivityID: "test_activity_id", + RequestID: "test_request_id", + ScheduleToStartTimeout: 5 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 8 * time.Second, + HeartbeatTimeout: 3 * time.Second, + CancelRequested: true, + CancelRequestID: 4, + TimerTaskStatus: 5, + Attempt: 6, + TaskList: "test_task_list", + StartedIdentity: "test_identity", + HasRetryPolicy: true, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpirationTimestamp: now, + RetryBackoffCoefficient: 2.0, + RetryNonRetryableErrors: []string{"test_error"}, + RetryLastFailureReason: "test_failure_reason", + RetryLastWorkerIdentity: "test_worker_identity", + RetryLastFailureDetails: []byte("test_failure_details"), + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.activityInfoToBlob(data.(*ActivityInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.activityInfoFromBlob(data) + }, + }, + { + name: "ChildExecutionInfo", + data: &ChildExecutionInfo{ + Version: 1, + InitiatedEventBatchID: 2, + StartedID: 3, + InitiatedEvent: []byte("test_initiated_event"), + InitiatedEventEncoding: "test_initiated_encoding", + StartedWorkflowID: "test_started_workflow_id", + StartedRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + CreateRequestID: "test_create_request_id", + DomainID: "test_domain_id", + DomainNameDEPRECATED: "test_domain_name", + WorkflowTypeName: "test_workflow_type", + ParentClosePolicy: 4, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.childExecutionInfoToBlob(data.(*ChildExecutionInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.childExecutionInfoFromBlob(data) + }, + }, + { + name: "SignalInfo", + data: &SignalInfo{ + Version: 1, + InitiatedEventBatchID: 2, + RequestID: "test_request_id", + Name: "test_signal_name", + Input: []byte("test_input"), + Control: []byte("test_control"), + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.signalInfoToBlob(data.(*SignalInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.signalInfoFromBlob(data) + }, + }, + { + name: "RequestCancelInfo", + data: &RequestCancelInfo{ + Version: 1, + InitiatedEventBatchID: 2, + CancelRequestID: "test_cancel_request_id", + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.requestCancelInfoToBlob(data.(*RequestCancelInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.requestCancelInfoFromBlob(data) + }, + }, + { + name: "TimerInfo", + data: &TimerInfo{ + Version: 1, + StartedID: 2, + ExpiryTimestamp: now, + TaskID: 3, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.timerInfoToBlob(data.(*TimerInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.timerInfoFromBlob(data) + }, + }, + { + name: "TaskInfo", + data: &TaskInfo{ + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ScheduleID: 1, + ExpiryTimestamp: now, + CreatedTimestamp: now, + PartitionConfig: map[string]string{"test_key": "test_value"}, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.taskInfoToBlob(data.(*TaskInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.taskInfoFromBlob(data) + }, + }, + { + name: "TaskListInfo", + data: &TaskListInfo{ + Kind: 1, + AckLevel: 2, + ExpiryTimestamp: now, + LastUpdated: now, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.taskListInfoToBlob(data.(*TaskListInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.taskListInfoFromBlob(data) + }, + }, + { + name: "TransferTaskInfo", + data: &TransferTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TargetDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TargetDomainIDs: []UUID{MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")}, + TargetWorkflowID: "test_target_workflow_id", + TargetRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskList: "test_task_list", + TargetChildWorkflowOnly: true, + ScheduleID: 2, + Version: 3, + VisibilityTimestamp: now, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.transferTaskInfoToBlob(data.(*TransferTaskInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.transferTaskInfoFromBlob(data) + }, + }, + { + name: "TimerTaskInfo", + data: &TimerTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TimeoutType: common.Int16Ptr(2), + Version: 3, + ScheduleAttempt: 4, + EventID: 5, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.timerTaskInfoToBlob(data.(*TimerTaskInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.timerTaskInfoFromBlob(data) + }, + }, + { + name: "ReplicationTaskInfo", + data: &ReplicationTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + Version: 2, + FirstEventID: 3, + NextEventID: 4, + ScheduledID: 5, + EventStoreVersion: 6, + NewRunEventStoreVersion: 7, + BranchToken: []byte("test_branch_token"), + NewRunBranchToken: []byte("test_new_run_branch_token"), + CreationTimestamp: now, + }, + encodeFunc: func(data interface{}) ([]byte, error) { + return encoder.replicationTaskInfoToBlob(data.(*ReplicationTaskInfo)) + }, + decodeFunc: func(data []byte) (interface{}, error) { + return decoder.replicationTaskInfoFromBlob(data) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Encode the data using the encoder + encoded, err := tc.encodeFunc(tc.data) + require.NoError(t, err) + require.NotEmpty(t, encoded) + + // Verify the data is snappy compressed + assert.True(t, isValidSnappyData(encoded), "encoded data should be valid snappy compressed data") + + // Decode the data using the decoder and verify it matches + decoded, err := tc.decodeFunc(encoded) + require.NoError(t, err) + assert.Equal(t, tc.data, decoded) + }) + } +} + +func TestSnappyThriftEncoderEncodingType(t *testing.T) { + encoder := newSnappyThriftEncoder() + + encodingType := encoder.encodingType() + assert.Equal(t, constants.EncodingTypeThriftRWSnappy, encodingType) +} + +func TestSnappyThriftEncoderInterface(t *testing.T) { + // Verify that snappyThriftEncoder implements the encoder interface + var _ encoder = (*snappyThriftEncoder)(nil) + + // Test that newSnappyThriftEncoder returns a valid encoder + encoder := newSnappyThriftEncoder() + assert.NotNil(t, encoder) + assert.IsType(t, &snappyThriftEncoder{}, encoder) +} + +func TestSnappyThriftEncoderNilHandling(t *testing.T) { + encoder := newSnappyThriftEncoder() + + // Test each method with nil data + testCases := []struct { + name string + encodeFunc func() ([]byte, error) + }{ + { + name: "shardInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.shardInfoToBlob(nil) + }, + }, + { + name: "domainInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.domainInfoToBlob(nil) + }, + }, + { + name: "activityInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.activityInfoToBlob(nil) + }, + }, + { + name: "workflowExecutionInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.workflowExecutionInfoToBlob(nil) + }, + }, + { + name: "childExecutionInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.childExecutionInfoToBlob(nil) + }, + }, + { + name: "signalInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.signalInfoToBlob(nil) + }, + }, + { + name: "requestCancelInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.requestCancelInfoToBlob(nil) + }, + }, + { + name: "timerInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.timerInfoToBlob(nil) + }, + }, + { + name: "taskInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.taskInfoToBlob(nil) + }, + }, + { + name: "taskListInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.taskListInfoToBlob(nil) + }, + }, + { + name: "transferTaskInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.transferTaskInfoToBlob(nil) + }, + }, + { + name: "crossClusterTaskInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.crossClusterTaskInfoToBlob(nil) + }, + }, + { + name: "timerTaskInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.timerTaskInfoToBlob(nil) + }, + }, + { + name: "replicationTaskInfoToBlob with nil", + encodeFunc: func() ([]byte, error) { + return encoder.replicationTaskInfoToBlob(nil) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var result []byte + var err error + var panicRecovered bool + + // Use defer/recover to handle panic as expected behavior for nil input + func() { + defer func() { + if r := recover(); r != nil { + panicRecovered = true + } + }() + result, err = tc.encodeFunc() + }() + + // Nil input should either produce an error, panic, or valid empty data + if panicRecovered { + // Panic is expected for nil input + assert.True(t, true, "nil input caused expected panic") + } else if err != nil { + assert.Error(t, err) + assert.Nil(t, result) + } else { + // If no error, result should be valid snappy data + assert.NotNil(t, result) + assert.True(t, isValidSnappyData(result), "nil input should produce valid snappy data") + } + }) + } +} + +func TestSnappyThriftRWEncode(t *testing.T) { + // Test the low-level snappyThriftRWEncode function directly + + t.Run("Valid thrift object", func(t *testing.T) { + // Create a simple thrift struct and encode it + shardInfo := &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: time.Now(), + ReplicationAckLevel: 2, + TransferAckLevel: 3, + } + + // Convert to thrift format + thriftStruct := shardInfoToThrift(shardInfo) + + // Encode using the low-level function + encoded, err := snappyThriftRWEncode(thriftStruct) + require.NoError(t, err) + require.NotEmpty(t, encoded) + + // Verify it's valid snappy data + assert.True(t, isValidSnappyData(encoded)) + + // Verify we can decode it back + decoder := newSnappyThriftDecoder() + decoded, err := decoder.shardInfoFromBlob(encoded) + require.NoError(t, err) + + assert.Equal(t, shardInfo.StolenSinceRenew, decoded.StolenSinceRenew) + assert.Equal(t, shardInfo.ReplicationAckLevel, decoded.ReplicationAckLevel) + assert.Equal(t, shardInfo.TransferAckLevel, decoded.TransferAckLevel) + }) + + t.Run("Nil thrift object", func(t *testing.T) { + // Test with nil thrift object (converted from nil input) + thriftStruct := shardInfoToThrift(nil) + assert.Nil(t, thriftStruct) + + var err error + var panicRecovered bool + + // Use defer/recover to handle panic as expected behavior for nil input + func() { + defer func() { + if r := recover(); r != nil { + panicRecovered = true + } + }() + _, err = snappyThriftRWEncode(thriftStruct) + }() + + // Either panic or error is acceptable for nil input + if panicRecovered { + assert.True(t, true, "nil thrift object caused expected panic") + } else if err != nil { + assert.Error(t, err) + } + }) +} + +func TestSnappyThriftEncoderWithParser(t *testing.T) { + // Test encoder integration with parser + parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + require.NoError(t, err) + + now := time.Now().Round(time.Second) + + testData := &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: now, + ReplicationAckLevel: 2, + TransferAckLevel: 3, + TimerAckLevel: now, + DomainNotificationVersion: 4, + Owner: "test_owner", + } + + // Encode using parser + blob, err := parser.ShardInfoToBlob(testData) + require.NoError(t, err) + assert.Equal(t, constants.EncodingTypeThriftRWSnappy, blob.Encoding) + assert.NotEmpty(t, blob.Data) + assert.True(t, isValidSnappyData(blob.Data)) + + // Decode using parser + decoded, err := parser.ShardInfoFromBlob(blob.Data, string(blob.Encoding)) + require.NoError(t, err) + assert.Equal(t, testData, decoded) +} + +func TestSnappyThriftEncoderDataCompression(t *testing.T) { + encoder := newSnappyThriftEncoder() + + // Create a large data structure to test compression + largeData := &WorkflowExecutionInfo{ + WorkflowTypeName: "very_long_workflow_type_name_that_should_compress_well_when_repeated", + TaskList: "very_long_task_list_name_that_should_compress_well_when_repeated", + ExecutionContext: make([]byte, 1000), // Large byte array + SearchAttributes: make(map[string][]byte), + Memo: make(map[string][]byte), + } + + // Fill with repetitive data that should compress well + for i := 0; i < 100; i++ { + key := fmt.Sprintf("repetitive_key_that_compresses_well_%d", i) + value := []byte("repetitive_value_that_compresses_well_when_repeated_many_times") + largeData.SearchAttributes[key] = value + largeData.Memo[key] = value + } + + // Encode the data + encoded, err := encoder.workflowExecutionInfoToBlob(largeData) + require.NoError(t, err) + require.NotEmpty(t, encoded) + + // Verify it's compressed (should be significantly smaller than uncompressed) + assert.True(t, isValidSnappyData(encoded)) + + // Decode and verify correctness + decoder := newSnappyThriftDecoder() + decoded, err := decoder.workflowExecutionInfoFromBlob(encoded) + require.NoError(t, err) + assert.Equal(t, largeData.WorkflowTypeName, decoded.WorkflowTypeName) + assert.Equal(t, largeData.TaskList, decoded.TaskList) + assert.Len(t, decoded.SearchAttributes, 100) + assert.Len(t, decoded.Memo, 100) +} + +// Helper function to check if data is valid snappy compressed data +func isValidSnappyData(data []byte) bool { + _, err := snappy.Decode(nil, data) + return err == nil +} diff --git a/go.mod b/go.mod index 3540ba47b8c..637851de44b 100644 --- a/go.mod +++ b/go.mod @@ -104,7 +104,7 @@ require ( github.com/gogo/googleapis v1.3.2 // indirect github.com/gogo/status v1.1.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v0.0.4 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect From 93561a8d898957e0d23926e0f78f6cda5e6626e0 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Thu, 24 Jul 2025 10:58:39 +0200 Subject: [PATCH 2/5] Move test payloads into a common file --- .../serialization/serialization_test_utils.go | 254 +++++++++++++++ .../snappy_thrift_decoder_test.go | 300 +----------------- .../snappy_thrift_encoder_test.go | 275 +--------------- 3 files changed, 284 insertions(+), 545 deletions(-) create mode 100644 common/persistence/serialization/serialization_test_utils.go diff --git a/common/persistence/serialization/serialization_test_utils.go b/common/persistence/serialization/serialization_test_utils.go new file mode 100644 index 00000000000..6c8580e0f47 --- /dev/null +++ b/common/persistence/serialization/serialization_test_utils.go @@ -0,0 +1,254 @@ +package serialization + +import ( + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" +) + +var shardInfoTestData = &ShardInfo{ + StolenSinceRenew: 1, + UpdatedAt: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + ReplicationAckLevel: 1, + TransferAckLevel: 1, + TimerAckLevel: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + DomainNotificationVersion: 1, + ClusterTransferAckLevel: map[string]int64{"test": 1}, + ClusterTimerAckLevel: map[string]time.Time{"test": time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local)}, + TransferProcessingQueueStates: []byte{1, 2, 3}, + TimerProcessingQueueStates: []byte{1, 2, 3}, + Owner: "owner", + ClusterReplicationLevel: map[string]int64{"test": 1}, + PendingFailoverMarkers: []byte{2, 3, 4}, + PendingFailoverMarkersEncoding: "", + TransferProcessingQueueStatesEncoding: "", + TimerProcessingQueueStatesEncoding: "", +} + +var domainInfoTestData = &DomainInfo{ + Description: "test_desc", + Owner: "test_owner", + Status: 1, + Retention: 48 * time.Hour, + EmitMetric: true, + ArchivalBucket: "test_bucket", + ArchivalStatus: 1, + ConfigVersion: 1, + FailoverVersion: 1, + NotificationVersion: 1, + FailoverNotificationVersion: 1, + ActiveClusterName: "test_active_cluster", + Clusters: []string{"test_active_cluster", "test_standby_cluster"}, + Data: map[string]string{"test_key": "test_value"}, + BadBinaries: []byte{1, 2, 3}, + BadBinariesEncoding: "", + HistoryArchivalStatus: 1, + HistoryArchivalURI: "test_history_archival_uri", + VisibilityArchivalStatus: 1, + VisibilityArchivalURI: "test_visibility_archival_uri", +} + +var historyTreeInfoTestData = &HistoryTreeInfo{ + CreatedTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + Ancestors: []*types.HistoryBranchRange{ + { + BranchID: "test_branch_id1", + }, + }, +} + +var workflowExecutionInfoTestData = &WorkflowExecutionInfo{ + ParentDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ParentWorkflowID: "test_parent_workflow_id", + ParentRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + InitiatedID: 1, + CompletionEventBatchID: common.Int64Ptr(2), + CompletionEvent: []byte("test_completion_event"), + CompletionEventEncoding: "test_completion_event_encoding", + TaskList: "test_task_list", + WorkflowTypeName: "test_workflow_type", + WorkflowTimeout: 10 * time.Second, + DecisionTaskTimeout: 5 * time.Second, + ExecutionContext: []byte("test_execution_context"), + State: 1, + CloseStatus: 1, + StartVersion: 1, + LastWriteEventID: common.Int64Ptr(3), + LastEventTaskID: 4, + LastFirstEventID: 5, + LastProcessedEvent: 6, + StartTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + LastUpdatedTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + CreateRequestID: "test_create_request_id", + DecisionVersion: 7, + DecisionScheduleID: 8, + DecisionStartedID: 9, + DecisionRequestID: "test_decision_request_id", + DecisionTimeout: 3 * time.Second, + DecisionAttempt: 10, + DecisionStartedTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + DecisionScheduledTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + DecisionOriginalScheduledTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + CancelRequested: true, + CancelRequestID: "test_cancel_request_id", + StickyTaskList: "test_sticky_task_list", + StickyScheduleToStartTimeout: 2 * time.Second, + RetryAttempt: 11, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpiration: time.Hour, + RetryBackoffCoefficient: 2.0, + RetryExpirationTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + RetryNonRetryableErrors: []string{"test_error"}, + HasRetryPolicy: true, + CronSchedule: "test_cron", + IsCron: true, + EventStoreVersion: 12, + EventBranchToken: []byte("test_branch_token"), + SignalCount: 13, + HistorySize: 14, + ClientLibraryVersion: "test_client_version", + ClientFeatureVersion: "test_feature_version", + ClientImpl: "test_client_impl", + AutoResetPoints: []byte("test_reset_points"), + AutoResetPointsEncoding: "test_reset_points_encoding", + SearchAttributes: map[string][]byte{"test_key": []byte("test_value")}, + Memo: map[string][]byte{"test_memo": []byte("test_memo_value")}, + VersionHistories: []byte("test_version_histories"), + VersionHistoriesEncoding: "test_version_histories_encoding", + FirstExecutionRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), +} + +var activityInfoTestData = &ActivityInfo{ + Version: 1, + ScheduledEventBatchID: 2, + ScheduledEvent: []byte("test_scheduled_event"), + ScheduledEventEncoding: "test_scheduled_encoding", + ScheduledTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + StartedID: 3, + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + StartedTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + ActivityID: "test_activity_id", + RequestID: "test_request_id", + ScheduleToStartTimeout: 5 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 8 * time.Second, + HeartbeatTimeout: 3 * time.Second, + CancelRequested: true, + CancelRequestID: 4, + TimerTaskStatus: 5, + Attempt: 6, + TaskList: "test_task_list", + StartedIdentity: "test_identity", + HasRetryPolicy: true, + RetryInitialInterval: 1 * time.Second, + RetryMaximumInterval: 30 * time.Second, + RetryMaximumAttempts: 3, + RetryExpirationTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + RetryBackoffCoefficient: 2.0, + RetryNonRetryableErrors: []string{"test_error"}, + RetryLastFailureReason: "test_failure_reason", + RetryLastWorkerIdentity: "test_worker_identity", + RetryLastFailureDetails: []byte("test_failure_details"), +} + +var childExecutionInfoTestData = &ChildExecutionInfo{ + Version: 1, + InitiatedEventBatchID: 2, + StartedID: 3, + InitiatedEvent: []byte("test_initiated_event"), + InitiatedEventEncoding: "test_initiated_encoding", + StartedWorkflowID: "test_started_workflow_id", + StartedRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + StartedEvent: []byte("test_started_event"), + StartedEventEncoding: "test_started_encoding", + CreateRequestID: "test_create_request_id", + DomainID: "test_domain_id", + DomainNameDEPRECATED: "test_domain_name", + WorkflowTypeName: "test_workflow_type", + ParentClosePolicy: 4, +} + +var signalInfoTestData = &SignalInfo{ + Version: 1, + InitiatedEventBatchID: 2, + RequestID: "test_request_id", + Name: "test_signal_name", + Input: []byte("test_input"), + Control: []byte("test_control"), +} + +var requestCancelInfoTestData = &RequestCancelInfo{ + Version: 1, + InitiatedEventBatchID: 2, + CancelRequestID: "test_cancel_request_id", +} + +var timerInfoTestData = &TimerInfo{ + Version: 1, + StartedID: 2, + ExpiryTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + TaskID: 3, +} + +var taskInfoTestData = &TaskInfo{ + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + ScheduleID: 1, + ExpiryTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + CreatedTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + PartitionConfig: map[string]string{"test_key": "test_value"}, +} + +var taskListInfoTestData = &TaskListInfo{ + Kind: 1, + AckLevel: 2, + ExpiryTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), + LastUpdated: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), +} + +var transferTaskInfoTestData = &TransferTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TargetDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TargetDomainIDs: []UUID{MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")}, + TargetWorkflowID: "test_target_workflow_id", + TargetRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskList: "test_task_list", + TargetChildWorkflowOnly: true, + ScheduleID: 2, + Version: 3, + VisibilityTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), +} + +var timerTaskInfoTestData = &TimerTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + TimeoutType: common.Int16Ptr(2), + Version: 3, + ScheduleAttempt: 4, + EventID: 5, +} + +var replicationTaskInfoTestData = &ReplicationTaskInfo{ + DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + WorkflowID: "test_workflow_id", + RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + TaskType: 1, + Version: 2, + FirstEventID: 3, + NextEventID: 4, + ScheduledID: 5, + EventStoreVersion: 6, + NewRunEventStoreVersion: 7, + BranchToken: []byte("test_branch_token"), + NewRunBranchToken: []byte("test_new_run_branch_token"), + CreationTimestamp: time.Date(2025, 1, 1, 0, 0, 0, 0, time.Local), +} diff --git a/common/persistence/serialization/snappy_thrift_decoder_test.go b/common/persistence/serialization/snappy_thrift_decoder_test.go index 3867ea27c67..4f5456d8962 100644 --- a/common/persistence/serialization/snappy_thrift_decoder_test.go +++ b/common/persistence/serialization/snappy_thrift_decoder_test.go @@ -30,308 +30,36 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" - "github.com/uber/cadence/common/types" ) func TestSnappyThriftDecoderRoundTrip(t *testing.T) { parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) require.NoError(t, err) - now := time.Now().Round(time.Second) - testCases := []struct { name string data interface{} }{ - { - name: "ShardInfo", - data: &ShardInfo{ - StolenSinceRenew: 1, - UpdatedAt: now, - ReplicationAckLevel: 1, - TransferAckLevel: 1, - TimerAckLevel: now, - DomainNotificationVersion: 1, - ClusterTransferAckLevel: map[string]int64{"test": 1}, - ClusterTimerAckLevel: map[string]time.Time{"test": now}, - TransferProcessingQueueStates: []byte{1, 2, 3}, - TimerProcessingQueueStates: []byte{1, 2, 3}, - Owner: "owner", - ClusterReplicationLevel: map[string]int64{"test": 1}, - PendingFailoverMarkers: []byte{2, 3, 4}, - PendingFailoverMarkersEncoding: "", - TransferProcessingQueueStatesEncoding: "", - TimerProcessingQueueStatesEncoding: "", - }, - }, - { - name: "DomainInfo", - data: &DomainInfo{ - Name: "test", - Description: "test_desc", - Owner: "test_owner", - Status: 1, - Retention: 48 * time.Hour, - EmitMetric: true, - ArchivalBucket: "test_bucket", - ArchivalStatus: 1, - ConfigVersion: 1, - FailoverVersion: 1, - NotificationVersion: 1, - FailoverNotificationVersion: 1, - ActiveClusterName: "test_active_cluster", - Clusters: []string{"test_active_cluster", "test_standby_cluster"}, - Data: map[string]string{"test_key": "test_value"}, - BadBinaries: []byte{1, 2, 3}, - BadBinariesEncoding: "", - HistoryArchivalStatus: 1, - HistoryArchivalURI: "test_history_archival_uri", - VisibilityArchivalStatus: 1, - VisibilityArchivalURI: "test_visibility_archival_uri", - }, - }, - { - name: "HistoryTreeInfo", - data: &HistoryTreeInfo{ - CreatedTimestamp: now, - Ancestors: []*types.HistoryBranchRange{ - { - BranchID: "test_branch_id1", - }, - { - BranchID: "test_branch_id2", - }, - }, - Info: "test_info", - }, - }, - { - name: "WorkflowExecutionInfo", - data: &WorkflowExecutionInfo{ - ParentDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - ParentWorkflowID: "test_parent_workflow_id", - ParentRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - InitiatedID: 1, - CompletionEventBatchID: common.Int64Ptr(2), - CompletionEvent: []byte("test_completion_event"), - CompletionEventEncoding: "test_completion_event_encoding", - TaskList: "test_task_list", - WorkflowTypeName: "test_workflow_type", - WorkflowTimeout: 10 * time.Second, - DecisionTaskTimeout: 5 * time.Second, - ExecutionContext: []byte("test_execution_context"), - State: 1, - CloseStatus: 1, - StartVersion: 1, - LastWriteEventID: common.Int64Ptr(3), - LastEventTaskID: 4, - LastFirstEventID: 5, - LastProcessedEvent: 6, - StartTimestamp: now, - LastUpdatedTimestamp: now, - CreateRequestID: "test_create_request_id", - DecisionVersion: 7, - DecisionScheduleID: 8, - DecisionStartedID: 9, - DecisionRequestID: "test_decision_request_id", - DecisionTimeout: 3 * time.Second, - DecisionAttempt: 10, - DecisionStartedTimestamp: now, - DecisionScheduledTimestamp: now, - DecisionOriginalScheduledTimestamp: now, - CancelRequested: true, - CancelRequestID: "test_cancel_request_id", - StickyTaskList: "test_sticky_task_list", - StickyScheduleToStartTimeout: 2 * time.Second, - RetryAttempt: 11, - RetryInitialInterval: 1 * time.Second, - RetryMaximumInterval: 30 * time.Second, - RetryMaximumAttempts: 3, - RetryExpiration: time.Hour, - RetryBackoffCoefficient: 2.0, - RetryExpirationTimestamp: now, - RetryNonRetryableErrors: []string{"test_error"}, - HasRetryPolicy: true, - CronSchedule: "test_cron", - IsCron: true, - EventStoreVersion: 12, - EventBranchToken: []byte("test_branch_token"), - SignalCount: 13, - HistorySize: 14, - ClientLibraryVersion: "test_client_version", - ClientFeatureVersion: "test_feature_version", - ClientImpl: "test_client_impl", - AutoResetPoints: []byte("test_reset_points"), - AutoResetPointsEncoding: "test_reset_points_encoding", - SearchAttributes: map[string][]byte{"test_key": []byte("test_value")}, - Memo: map[string][]byte{"test_memo": []byte("test_memo_value")}, - VersionHistories: []byte("test_version_histories"), - VersionHistoriesEncoding: "test_version_histories_encoding", - FirstExecutionRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - }, - }, - { - name: "ActivityInfo", - data: &ActivityInfo{ - Version: 1, - ScheduledEventBatchID: 2, - ScheduledEvent: []byte("test_scheduled_event"), - ScheduledEventEncoding: "test_scheduled_encoding", - ScheduledTimestamp: now, - StartedID: 3, - StartedEvent: []byte("test_started_event"), - StartedEventEncoding: "test_started_encoding", - StartedTimestamp: now, - ActivityID: "test_activity_id", - RequestID: "test_request_id", - ScheduleToStartTimeout: 5 * time.Second, - ScheduleToCloseTimeout: 10 * time.Second, - StartToCloseTimeout: 8 * time.Second, - HeartbeatTimeout: 3 * time.Second, - CancelRequested: true, - CancelRequestID: 4, - TimerTaskStatus: 5, - Attempt: 6, - TaskList: "test_task_list", - StartedIdentity: "test_identity", - HasRetryPolicy: true, - RetryInitialInterval: 1 * time.Second, - RetryMaximumInterval: 30 * time.Second, - RetryMaximumAttempts: 3, - RetryExpirationTimestamp: now, - RetryBackoffCoefficient: 2.0, - RetryNonRetryableErrors: []string{"test_error"}, - RetryLastFailureReason: "test_failure_reason", - RetryLastWorkerIdentity: "test_worker_identity", - RetryLastFailureDetails: []byte("test_failure_details"), - }, - }, - { - name: "ChildExecutionInfo", - data: &ChildExecutionInfo{ - Version: 1, - InitiatedEventBatchID: 2, - StartedID: 3, - InitiatedEvent: []byte("test_initiated_event"), - InitiatedEventEncoding: "test_initiated_encoding", - StartedWorkflowID: "test_started_workflow_id", - StartedRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - StartedEvent: []byte("test_started_event"), - StartedEventEncoding: "test_started_encoding", - CreateRequestID: "test_create_request_id", - DomainID: "test_domain_id", - DomainNameDEPRECATED: "test_domain_name", - WorkflowTypeName: "test_workflow_type", - ParentClosePolicy: 4, - }, - }, - { - name: "SignalInfo", - data: &SignalInfo{ - Version: 1, - InitiatedEventBatchID: 2, - RequestID: "test_request_id", - Name: "test_signal_name", - Input: []byte("test_input"), - Control: []byte("test_control"), - }, - }, - { - name: "RequestCancelInfo", - data: &RequestCancelInfo{ - Version: 1, - InitiatedEventBatchID: 2, - CancelRequestID: "test_cancel_request_id", - }, - }, - { - name: "TimerInfo", - data: &TimerInfo{ - Version: 1, - StartedID: 2, - ExpiryTimestamp: now, - TaskID: 3, - }, - }, - { - name: "TaskInfo", - data: &TaskInfo{ - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - ScheduleID: 1, - ExpiryTimestamp: now, - CreatedTimestamp: now, - PartitionConfig: map[string]string{"test_key": "test_value"}, - }, - }, - { - name: "TaskListInfo", - data: &TaskListInfo{ - Kind: 1, - AckLevel: 2, - ExpiryTimestamp: now, - LastUpdated: now, - }, - }, - { - name: "TransferTaskInfo", - data: &TransferTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - TargetDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TargetDomainIDs: []UUID{MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")}, - TargetWorkflowID: "test_target_workflow_id", - TargetRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskList: "test_task_list", - TargetChildWorkflowOnly: true, - ScheduleID: 2, - Version: 3, - VisibilityTimestamp: now, - }, - }, - { - name: "TimerTaskInfo", - data: &TimerTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - TimeoutType: common.Int16Ptr(2), - Version: 3, - ScheduleAttempt: 4, - EventID: 5, - }, - }, - { - name: "ReplicationTaskInfo", - data: &ReplicationTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - Version: 2, - FirstEventID: 3, - NextEventID: 4, - ScheduledID: 5, - EventStoreVersion: 6, - NewRunEventStoreVersion: 7, - BranchToken: []byte("test_branch_token"), - NewRunBranchToken: []byte("test_new_run_branch_token"), - CreationTimestamp: now, - }, - }, + {name: "ShardInfo", data: shardInfoTestData}, + {name: "DomainInfo", data: domainInfoTestData}, + {name: "HistoryTreeInfo", data: historyTreeInfoTestData}, + {name: "WorkflowExecutionInfo", data: workflowExecutionInfoTestData}, + {name: "ActivityInfo", data: activityInfoTestData}, + {name: "ChildExecutionInfo", data: childExecutionInfoTestData}, + {name: "SignalInfo", data: signalInfoTestData}, + {name: "RequestCancelInfo", data: requestCancelInfoTestData}, + {name: "TimerInfo", data: timerInfoTestData}, + {name: "TaskInfo", data: taskInfoTestData}, + {name: "TaskListInfo", data: taskListInfoTestData}, + {name: "TransferTaskInfo", data: transferTaskInfoTestData}, + {name: "TimerTaskInfo", data: timerTaskInfoTestData}, + {name: "ReplicationTaskInfo", data: replicationTaskInfoTestData}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - // Encode the data using the parser blob := encodeWithParser(t, parser, tc.data) - - // Decode the data using the parser and verify it matches decoded := decodeWithParser(t, parser, blob, tc.data) assert.Equal(t, tc.data, decoded) }) diff --git a/common/persistence/serialization/snappy_thrift_encoder_test.go b/common/persistence/serialization/snappy_thrift_encoder_test.go index 581e56b2055..b80efc7ebc4 100644 --- a/common/persistence/serialization/snappy_thrift_encoder_test.go +++ b/common/persistence/serialization/snappy_thrift_encoder_test.go @@ -25,23 +25,18 @@ package serialization import ( "fmt" "testing" - "time" "github.com/golang/snappy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" - "github.com/uber/cadence/common/types" ) func TestSnappyThriftEncoderRoundTrip(t *testing.T) { encoder := newSnappyThriftEncoder() decoder := newSnappyThriftDecoder() - now := time.Now().Round(time.Second) - testCases := []struct { name string data interface{} @@ -50,24 +45,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }{ { name: "ShardInfo", - data: &ShardInfo{ - StolenSinceRenew: 1, - UpdatedAt: now, - ReplicationAckLevel: 1, - TransferAckLevel: 1, - TimerAckLevel: now, - DomainNotificationVersion: 1, - ClusterTransferAckLevel: map[string]int64{"test": 1}, - ClusterTimerAckLevel: map[string]time.Time{"test": now}, - TransferProcessingQueueStates: []byte{1, 2, 3}, - TimerProcessingQueueStates: []byte{1, 2, 3}, - Owner: "owner", - ClusterReplicationLevel: map[string]int64{"test": 1}, - PendingFailoverMarkers: []byte{2, 3, 4}, - PendingFailoverMarkersEncoding: "", - TransferProcessingQueueStatesEncoding: "", - TimerProcessingQueueStatesEncoding: "", - }, + data: shardInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.shardInfoToBlob(data.(*ShardInfo)) }, @@ -77,29 +55,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "DomainInfo", - data: &DomainInfo{ - Name: "test", - Description: "test_desc", - Owner: "test_owner", - Status: 1, - Retention: 48 * time.Hour, - EmitMetric: true, - ArchivalBucket: "test_bucket", - ArchivalStatus: 1, - ConfigVersion: 1, - FailoverVersion: 1, - NotificationVersion: 1, - FailoverNotificationVersion: 1, - ActiveClusterName: "test_active_cluster", - Clusters: []string{"test_active_cluster", "test_standby_cluster"}, - Data: map[string]string{"test_key": "test_value"}, - BadBinaries: []byte{1, 2, 3}, - BadBinariesEncoding: "", - HistoryArchivalStatus: 1, - HistoryArchivalURI: "test_history_archival_uri", - VisibilityArchivalStatus: 1, - VisibilityArchivalURI: "test_visibility_archival_uri", - }, + data: domainInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.domainInfoToBlob(data.(*DomainInfo)) }, @@ -109,18 +65,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "HistoryTreeInfo", - data: &HistoryTreeInfo{ - CreatedTimestamp: now, - Ancestors: []*types.HistoryBranchRange{ - { - BranchID: "test_branch_id1", - }, - { - BranchID: "test_branch_id2", - }, - }, - Info: "test_info", - }, + data: historyTreeInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.historyTreeInfoToBlob(data.(*HistoryTreeInfo)) }, @@ -130,68 +75,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "WorkflowExecutionInfo", - data: &WorkflowExecutionInfo{ - ParentDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - ParentWorkflowID: "test_parent_workflow_id", - ParentRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - InitiatedID: 1, - CompletionEventBatchID: common.Int64Ptr(2), - CompletionEvent: []byte("test_completion_event"), - CompletionEventEncoding: "test_completion_event_encoding", - TaskList: "test_task_list", - WorkflowTypeName: "test_workflow_type", - WorkflowTimeout: 10 * time.Second, - DecisionTaskTimeout: 5 * time.Second, - ExecutionContext: []byte("test_execution_context"), - State: 1, - CloseStatus: 1, - StartVersion: 1, - LastWriteEventID: common.Int64Ptr(3), - LastEventTaskID: 4, - LastFirstEventID: 5, - LastProcessedEvent: 6, - StartTimestamp: now, - LastUpdatedTimestamp: now, - CreateRequestID: "test_create_request_id", - DecisionVersion: 7, - DecisionScheduleID: 8, - DecisionStartedID: 9, - DecisionRequestID: "test_decision_request_id", - DecisionTimeout: 3 * time.Second, - DecisionAttempt: 10, - DecisionStartedTimestamp: now, - DecisionScheduledTimestamp: now, - DecisionOriginalScheduledTimestamp: now, - CancelRequested: true, - CancelRequestID: "test_cancel_request_id", - StickyTaskList: "test_sticky_task_list", - StickyScheduleToStartTimeout: 2 * time.Second, - RetryAttempt: 11, - RetryInitialInterval: 1 * time.Second, - RetryMaximumInterval: 30 * time.Second, - RetryMaximumAttempts: 3, - RetryExpiration: time.Hour, - RetryBackoffCoefficient: 2.0, - RetryExpirationTimestamp: now, - RetryNonRetryableErrors: []string{"test_error"}, - HasRetryPolicy: true, - CronSchedule: "test_cron", - IsCron: true, - EventStoreVersion: 12, - EventBranchToken: []byte("test_branch_token"), - SignalCount: 13, - HistorySize: 14, - ClientLibraryVersion: "test_client_version", - ClientFeatureVersion: "test_feature_version", - ClientImpl: "test_client_impl", - AutoResetPoints: []byte("test_reset_points"), - AutoResetPointsEncoding: "test_reset_points_encoding", - SearchAttributes: map[string][]byte{"test_key": []byte("test_value")}, - Memo: map[string][]byte{"test_memo": []byte("test_memo_value")}, - VersionHistories: []byte("test_version_histories"), - VersionHistoriesEncoding: "test_version_histories_encoding", - FirstExecutionRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - }, + data: workflowExecutionInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.workflowExecutionInfoToBlob(data.(*WorkflowExecutionInfo)) }, @@ -201,39 +85,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "ActivityInfo", - data: &ActivityInfo{ - Version: 1, - ScheduledEventBatchID: 2, - ScheduledEvent: []byte("test_scheduled_event"), - ScheduledEventEncoding: "test_scheduled_encoding", - ScheduledTimestamp: now, - StartedID: 3, - StartedEvent: []byte("test_started_event"), - StartedEventEncoding: "test_started_encoding", - StartedTimestamp: now, - ActivityID: "test_activity_id", - RequestID: "test_request_id", - ScheduleToStartTimeout: 5 * time.Second, - ScheduleToCloseTimeout: 10 * time.Second, - StartToCloseTimeout: 8 * time.Second, - HeartbeatTimeout: 3 * time.Second, - CancelRequested: true, - CancelRequestID: 4, - TimerTaskStatus: 5, - Attempt: 6, - TaskList: "test_task_list", - StartedIdentity: "test_identity", - HasRetryPolicy: true, - RetryInitialInterval: 1 * time.Second, - RetryMaximumInterval: 30 * time.Second, - RetryMaximumAttempts: 3, - RetryExpirationTimestamp: now, - RetryBackoffCoefficient: 2.0, - RetryNonRetryableErrors: []string{"test_error"}, - RetryLastFailureReason: "test_failure_reason", - RetryLastWorkerIdentity: "test_worker_identity", - RetryLastFailureDetails: []byte("test_failure_details"), - }, + data: activityInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.activityInfoToBlob(data.(*ActivityInfo)) }, @@ -243,22 +95,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "ChildExecutionInfo", - data: &ChildExecutionInfo{ - Version: 1, - InitiatedEventBatchID: 2, - StartedID: 3, - InitiatedEvent: []byte("test_initiated_event"), - InitiatedEventEncoding: "test_initiated_encoding", - StartedWorkflowID: "test_started_workflow_id", - StartedRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - StartedEvent: []byte("test_started_event"), - StartedEventEncoding: "test_started_encoding", - CreateRequestID: "test_create_request_id", - DomainID: "test_domain_id", - DomainNameDEPRECATED: "test_domain_name", - WorkflowTypeName: "test_workflow_type", - ParentClosePolicy: 4, - }, + data: childExecutionInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.childExecutionInfoToBlob(data.(*ChildExecutionInfo)) }, @@ -268,14 +105,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "SignalInfo", - data: &SignalInfo{ - Version: 1, - InitiatedEventBatchID: 2, - RequestID: "test_request_id", - Name: "test_signal_name", - Input: []byte("test_input"), - Control: []byte("test_control"), - }, + data: signalInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.signalInfoToBlob(data.(*SignalInfo)) }, @@ -285,11 +115,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "RequestCancelInfo", - data: &RequestCancelInfo{ - Version: 1, - InitiatedEventBatchID: 2, - CancelRequestID: "test_cancel_request_id", - }, + data: requestCancelInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.requestCancelInfoToBlob(data.(*RequestCancelInfo)) }, @@ -299,12 +125,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "TimerInfo", - data: &TimerInfo{ - Version: 1, - StartedID: 2, - ExpiryTimestamp: now, - TaskID: 3, - }, + data: timerInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.timerInfoToBlob(data.(*TimerInfo)) }, @@ -314,14 +135,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "TaskInfo", - data: &TaskInfo{ - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - ScheduleID: 1, - ExpiryTimestamp: now, - CreatedTimestamp: now, - PartitionConfig: map[string]string{"test_key": "test_value"}, - }, + data: taskInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.taskInfoToBlob(data.(*TaskInfo)) }, @@ -331,12 +145,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "TaskListInfo", - data: &TaskListInfo{ - Kind: 1, - AckLevel: 2, - ExpiryTimestamp: now, - LastUpdated: now, - }, + data: taskListInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.taskListInfoToBlob(data.(*TaskListInfo)) }, @@ -346,21 +155,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "TransferTaskInfo", - data: &TransferTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - TargetDomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TargetDomainIDs: []UUID{MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")}, - TargetWorkflowID: "test_target_workflow_id", - TargetRunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskList: "test_task_list", - TargetChildWorkflowOnly: true, - ScheduleID: 2, - Version: 3, - VisibilityTimestamp: now, - }, + data: transferTaskInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.transferTaskInfoToBlob(data.(*TransferTaskInfo)) }, @@ -370,16 +165,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "TimerTaskInfo", - data: &TimerTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - TimeoutType: common.Int16Ptr(2), - Version: 3, - ScheduleAttempt: 4, - EventID: 5, - }, + data: timerTaskInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.timerTaskInfoToBlob(data.(*TimerTaskInfo)) }, @@ -389,21 +175,7 @@ func TestSnappyThriftEncoderRoundTrip(t *testing.T) { }, { name: "ReplicationTaskInfo", - data: &ReplicationTaskInfo{ - DomainID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - WorkflowID: "test_workflow_id", - RunID: MustParseUUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8"), - TaskType: 1, - Version: 2, - FirstEventID: 3, - NextEventID: 4, - ScheduledID: 5, - EventStoreVersion: 6, - NewRunEventStoreVersion: 7, - BranchToken: []byte("test_branch_token"), - NewRunBranchToken: []byte("test_new_run_branch_token"), - CreationTimestamp: now, - }, + data: replicationTaskInfoTestData, encodeFunc: func(data interface{}) ([]byte, error) { return encoder.replicationTaskInfoToBlob(data.(*ReplicationTaskInfo)) }, @@ -579,12 +351,7 @@ func TestSnappyThriftRWEncode(t *testing.T) { t.Run("Valid thrift object", func(t *testing.T) { // Create a simple thrift struct and encode it - shardInfo := &ShardInfo{ - StolenSinceRenew: 1, - UpdatedAt: time.Now(), - ReplicationAckLevel: 2, - TransferAckLevel: 3, - } + shardInfo := shardInfoTestData // Convert to thrift format thriftStruct := shardInfoToThrift(shardInfo) @@ -639,17 +406,7 @@ func TestSnappyThriftEncoderWithParser(t *testing.T) { parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) require.NoError(t, err) - now := time.Now().Round(time.Second) - - testData := &ShardInfo{ - StolenSinceRenew: 1, - UpdatedAt: now, - ReplicationAckLevel: 2, - TransferAckLevel: 3, - TimerAckLevel: now, - DomainNotificationVersion: 4, - Owner: "test_owner", - } + testData := shardInfoTestData // Encode using parser blob, err := parser.ShardInfoToBlob(testData) From aeba827dba9c243607fd4529fdd2da15f6098a76 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Thu, 7 Aug 2025 14:15:32 +0200 Subject: [PATCH 3/5] Add feature flag for serialization encoding --- .../dynamicproperties/constants.go | 11 ++ common/persistence/client/factory.go | 34 +--- common/persistence/config.go | 2 + .../persistence-tests/persistenceTestBase.go | 1 + common/persistence/serialization/parser.go | 186 ++++++++++++++---- .../persistence/serialization/parser_test.go | 11 +- .../snappy_thrift_decoder_test.go | 7 +- .../snappy_thrift_encoder_test.go | 7 +- .../serialization/task_serializer_test.go | 6 +- common/persistence/sql/sql_task_store_test.go | 6 +- 10 files changed, 201 insertions(+), 70 deletions(-) diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index 3a2f87faf87..c52cea85728 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -2405,6 +2405,12 @@ const ( // LastStringKey must be the last one in this const group LastStringKey + + // SerializationEncoding is the encoding type for blobs + // KeyName: history.serializationEncoding + // Value type: String + // Default value: "thriftrw" + SerializationEncoding ) const ( @@ -4865,6 +4871,11 @@ var StringKeys = map[StringKey]DynamicString{ Description: "MatchingShardDistributionMode defines which shard distribution mode should be used", DefaultValue: "hash_ring", }, + SerializationEncoding: { + KeyName: "history.serializationEncoding", + Description: "SerializationEncoding is the encoding type for blobs", + DefaultValue: string(constants.EncodingTypeThriftRW), + }, } var DurationKeys = map[DurationKey]DynamicDuration{ diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index 5fc976f4df2..a37216c5fad 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -526,12 +526,12 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]} switch { case defaultCfg.NoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig() defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case defaultCfg.ShardedNoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case defaultCfg.SQL != nil: @@ -543,16 +543,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite string(constants.EncodingTypeThriftRW), } } - var decodingTypes []constants.EncodingType - for _, dt := range defaultCfg.SQL.DecodingTypes { - decodingTypes = append(decodingTypes, constants.EncodingType(dt)) - } - defaultDataStore.factory = sql.NewFactory( - *defaultCfg.SQL, - clusterName, - f.logger, - getParser(f.logger, constants.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...), - f.dc) + defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger, f.getParser(), f.dc) default: f.logger.Fatal("invalid config: one of nosql or sql params must be specified for defaultDataStore") } @@ -576,21 +567,12 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]} switch { case visibilityCfg.NoSQL != nil: - parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + parser := f.getParser() taskSerializer := serialization.NewTaskSerializer(parser) shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig() visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc) case visibilityCfg.SQL != nil: - var decodingTypes []constants.EncodingType - for _, dt := range visibilityCfg.SQL.DecodingTypes { - decodingTypes = append(decodingTypes, constants.EncodingType(dt)) - } - visibilityDataStore.factory = sql.NewFactory( - *visibilityCfg.SQL, - clusterName, - f.logger, - getParser(f.logger, constants.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...), - f.dc) + visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger, f.getParser(), f.dc) default: f.logger.Fatal("invalid config: one of nosql or sql params must be specified for visibilityStore") } @@ -598,10 +580,10 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite f.datastores[storeTypeVisibility] = visibilityDataStore } -func getParser(logger log.Logger, encodingType constants.EncodingType, decodingTypes ...constants.EncodingType) serialization.Parser { - parser, err := serialization.NewParser(encodingType, decodingTypes...) +func (f *factoryImpl) getParser() serialization.Parser { + parser, err := serialization.NewParser(f.dc) if err != nil { - logger.Fatal("failed to construct parser", tag.Error(err)) + f.logger.Fatal("failed to construct parser", tag.Error(err)) } return parser } diff --git a/common/persistence/config.go b/common/persistence/config.go index 5db433e26cb..b8db3c90663 100644 --- a/common/persistence/config.go +++ b/common/persistence/config.go @@ -35,6 +35,7 @@ type ( EnableHistoryTaskDualWriteMode dynamicproperties.BoolPropertyFn ReadNoSQLHistoryTaskFromDataBlob dynamicproperties.BoolPropertyFn ReadNoSQLShardFromDataBlob dynamicproperties.BoolPropertyFn + SerializationEncoding dynamicproperties.StringPropertyFn } ) @@ -48,5 +49,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration EnableHistoryTaskDualWriteMode: dc.GetBoolProperty(dynamicproperties.EnableNoSQLHistoryTaskDualWriteMode), ReadNoSQLHistoryTaskFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLHistoryTaskFromDataBlob), ReadNoSQLShardFromDataBlob: dc.GetBoolProperty(dynamicproperties.ReadNoSQLShardFromDataBlob), + SerializationEncoding: dc.GetStringProperty(dynamicproperties.SerializationEncoding), } } diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index c574508a7a9..0232415cae9 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -188,6 +188,7 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := TestBaseParams{ DefaultTestCluster: testCluster, diff --git a/common/persistence/serialization/parser.go b/common/persistence/serialization/parser.go index 2306c10fb91..7f5931f5ecb 100644 --- a/common/persistence/serialization/parser.go +++ b/common/persistence/serialization/parser.go @@ -31,193 +31,295 @@ import ( type ( parser struct { - encoder encoder + dc *persistence.DynamicConfiguration + encoders map[constants.EncodingType]encoder decoders map[constants.EncodingType]decoder } ) +var allBlobEncodings = []constants.EncodingType{ + constants.EncodingTypeThriftRW, + constants.EncodingTypeThriftRWSnappy, +} + // NewParser constructs a new parser using encoder as specified by encodingType and using decoders specified by decodingTypes -func NewParser(encodingType constants.EncodingType, decodingTypes ...constants.EncodingType) (Parser, error) { - encoder, err := getEncoder(encodingType) - if err != nil { - return nil, err - } +func NewParser(dc *persistence.DynamicConfiguration) (Parser, error) { + encoders := make(map[constants.EncodingType]encoder) decoders := make(map[constants.EncodingType]decoder) - for _, dt := range decodingTypes { + + for _, dt := range allBlobEncodings { decoder, err := getDecoder(dt) if err != nil { return nil, err } decoders[dt] = decoder + + encoder, err := getEncoder(dt) + if err != nil { + return nil, err + } + encoders[dt] = encoder } return &parser{ - encoder: encoder, + dc: dc, + encoders: encoders, decoders: decoders, }, nil } func (p *parser) ShardInfoToBlob(info *ShardInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.shardInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) if err != nil { return db, err } + + data, err := encoder.shardInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) DomainInfoToBlob(info *DomainInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.domainInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.domainInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) HistoryTreeInfoToBlob(info *HistoryTreeInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.historyTreeInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.historyTreeInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) WorkflowExecutionInfoToBlob(info *WorkflowExecutionInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.workflowExecutionInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.workflowExecutionInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ActivityInfoToBlob(info *ActivityInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.activityInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.activityInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ChildExecutionInfoToBlob(info *ChildExecutionInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.childExecutionInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.childExecutionInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) SignalInfoToBlob(info *SignalInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.signalInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.signalInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) RequestCancelInfoToBlob(info *RequestCancelInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.requestCancelInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.requestCancelInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TimerInfoToBlob(info *TimerInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.timerInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.timerInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TaskInfoToBlob(info *TaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.taskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.taskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TaskListInfoToBlob(info *TaskListInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.taskListInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.taskListInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TransferTaskInfoToBlob(info *TransferTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.transferTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.transferTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) CrossClusterTaskInfoToBlob(info *CrossClusterTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.crossClusterTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.crossClusterTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) TimerTaskInfoToBlob(info *TimerTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.timerTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.timerTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } func (p *parser) ReplicationTaskInfoToBlob(info *ReplicationTaskInfo) (persistence.DataBlob, error) { db := persistence.DataBlob{} - data, err := p.encoder.replicationTaskInfoToBlob(info) + encoding := p.dc.SerializationEncoding() + encoder, err := p.getCachedEncoder(constants.EncodingType(encoding)) + if err != nil { + return db, err + } + + data, err := encoder.replicationTaskInfoToBlob(info) if err != nil { return db, err } db.Data = data - db.Encoding = p.encoder.encodingType() + db.Encoding = encoder.encodingType() return db, nil } @@ -341,6 +443,14 @@ func (p *parser) ReplicationTaskInfoFromBlob(data []byte, encoding string) (*Rep return decoder.replicationTaskInfoFromBlob(data) } +func (p *parser) getCachedEncoder(encoding constants.EncodingType) (encoder, error) { + encoder, ok := p.encoders[encoding] + if !ok { + return nil, unsupportedEncodingError(encoding) + } + return encoder, nil +} + func (p *parser) getCachedDecoder(encoding constants.EncodingType) (decoder, error) { decoder, ok := p.decoders[encoding] if !ok { diff --git a/common/persistence/serialization/parser_test.go b/common/persistence/serialization/parser_test.go index 622bb8ee07e..f2cca2a6945 100644 --- a/common/persistence/serialization/parser_test.go +++ b/common/persistence/serialization/parser_test.go @@ -33,12 +33,16 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) func TestParserRoundTrip(t *testing.T) { - thriftParser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + thriftParser, err := NewParser(dc) assert.NoError(t, err) now := time.Now().Round(time.Second) @@ -355,7 +359,10 @@ func TestParser_WorkflowExecution_with_cron(t *testing.T) { CronSchedule: "@every 1m", IsCron: true, } - parser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := NewParser(dc) require.NoError(t, err) blob, err := parser.WorkflowExecutionInfoToBlob(info) require.NoError(t, err) diff --git a/common/persistence/serialization/snappy_thrift_decoder_test.go b/common/persistence/serialization/snappy_thrift_decoder_test.go index 4f5456d8962..526b4690746 100644 --- a/common/persistence/serialization/snappy_thrift_decoder_test.go +++ b/common/persistence/serialization/snappy_thrift_decoder_test.go @@ -31,10 +31,15 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/persistence" ) func TestSnappyThriftDecoderRoundTrip(t *testing.T) { - parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRWSnappy)), + } + parser, err := NewParser(dc) require.NoError(t, err) testCases := []struct { diff --git a/common/persistence/serialization/snappy_thrift_encoder_test.go b/common/persistence/serialization/snappy_thrift_encoder_test.go index b80efc7ebc4..d5b69a87c95 100644 --- a/common/persistence/serialization/snappy_thrift_encoder_test.go +++ b/common/persistence/serialization/snappy_thrift_encoder_test.go @@ -31,6 +31,8 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/persistence" ) func TestSnappyThriftEncoderRoundTrip(t *testing.T) { @@ -402,8 +404,11 @@ func TestSnappyThriftRWEncode(t *testing.T) { } func TestSnappyThriftEncoderWithParser(t *testing.T) { + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRWSnappy)), + } // Test encoder integration with parser - parser, err := NewParser(constants.EncodingTypeThriftRWSnappy, constants.EncodingTypeThriftRWSnappy) + parser, err := NewParser(dc) require.NoError(t, err) testData := shardInfoTestData diff --git a/common/persistence/serialization/task_serializer_test.go b/common/persistence/serialization/task_serializer_test.go index b077c8b50d5..d462fbdbcd3 100644 --- a/common/persistence/serialization/task_serializer_test.go +++ b/common/persistence/serialization/task_serializer_test.go @@ -30,11 +30,15 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" ) func TestTaskSerializerThriftRW(t *testing.T) { - parser, err := NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := NewParser(dc) require.NoError(t, err) taskSerializer := NewTaskSerializer(parser) diff --git a/common/persistence/sql/sql_task_store_test.go b/common/persistence/sql/sql_task_store_test.go index 11d0aa99e02..37cc070712b 100644 --- a/common/persistence/sql/sql_task_store_test.go +++ b/common/persistence/sql/sql_task_store_test.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/serialization" "github.com/uber/cadence/common/persistence/sql/sqlplugin" @@ -1721,7 +1722,10 @@ func TestHelp(t *testing.T) { shard := sqlplugin.GetDBShardIDFromDomainIDAndTasklist("c4b5cb22-c213-4812-bb4a-fc1ade5405ef", "pgtasklist", 16384) println("shard: ", shard) // BgAKAAAKAAwAAAAABQ+kWAoADgAAAAAAAAAACgAQGB6uFsU9cvEMABIKAAoAAAAAAAAAAQgADAAAAAAIAA4AAAAAAAA= - parser, err := serialization.NewParser(constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW) + dc := &persistence.DynamicConfiguration{ + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), + } + parser, err := serialization.NewParser(dc) require.NoError(t, err) data, err := base64.StdEncoding.DecodeString("BgAKAAAKAAwAAAAABGGFYAoADgAAAAAAAAAACgAQGB6uGPaVqOUMABIKAAoAAAAAAAAAAQgADAAAAAIIAA4AAAACAAA=") require.NoError(t, err) From 218091076675976cce5a684e6e4cb010f24033f0 Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Fri, 8 Aug 2025 15:19:06 +0200 Subject: [PATCH 4/5] Fix tests --- common/persistence/persistence-tests/persistenceTestBase.go | 1 + 1 file changed, 1 insertion(+) diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 0232415cae9..113fe3fa31a 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -157,6 +157,7 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := TestBaseParams{ DefaultTestCluster: testCluster, From 7c4d7931e3f349896afb3561b6ebefdd739051ff Mon Sep 17 00:00:00 2001 From: Ignat Tubylov Date: Mon, 11 Aug 2025 19:50:48 +0200 Subject: [PATCH 5/5] Fix tests --- host/async_wf_test.go | 2 ++ host/integrationbase.go | 1 + host/ndc/integration_test.go | 1 + host/pinot_test.go | 2 ++ host/workflowidratelimit_test.go | 1 + host/workflowsidinternalratelimit_test.go | 2 ++ simulation/history/history_simulation_test.go | 2 ++ simulation/matching/matching_simulation_test.go | 1 + 8 files changed, 12 insertions(+) diff --git a/host/async_wf_test.go b/host/async_wf_test.go index 4bedf7859a3..da19afe614f 100644 --- a/host/async_wf_test.go +++ b/host/async_wf_test.go @@ -52,6 +52,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" pt "github.com/uber/cadence/common/persistence/persistence-tests" @@ -99,6 +100,7 @@ func (s *AsyncWFIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/integrationbase.go b/host/integrationbase.go index 6108cca9dd7..37e85e5c247 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -115,6 +115,7 @@ func (s *IntegrationBase) setupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/ndc/integration_test.go b/host/ndc/integration_test.go index a197af47298..bb0f98b7924 100644 --- a/host/ndc/integration_test.go +++ b/host/ndc/integration_test.go @@ -103,6 +103,7 @@ func (s *NDCIntegrationTestSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.defaultTestCluster, diff --git a/host/pinot_test.go b/host/pinot_test.go index 45b87c230b2..8f2cff079ec 100644 --- a/host/pinot_test.go +++ b/host/pinot_test.go @@ -56,6 +56,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log" @@ -114,6 +115,7 @@ func (s *PinotIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/workflowidratelimit_test.go b/host/workflowidratelimit_test.go index 61965b86453..55a6b68089e 100644 --- a/host/workflowidratelimit_test.go +++ b/host/workflowidratelimit_test.go @@ -77,6 +77,7 @@ func (s *WorkflowIDRateLimitIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/host/workflowsidinternalratelimit_test.go b/host/workflowsidinternalratelimit_test.go index ec498851695..eb82a170aa5 100644 --- a/host/workflowsidinternalratelimit_test.go +++ b/host/workflowsidinternalratelimit_test.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -80,6 +81,7 @@ func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/simulation/history/history_simulation_test.go b/simulation/history/history_simulation_test.go index 27e0d12ad45..48b73dc5b91 100644 --- a/simulation/history/history_simulation_test.go +++ b/simulation/history/history_simulation_test.go @@ -17,6 +17,7 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/transport/grpc" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -75,6 +76,7 @@ func (s *HistorySimulationSuite) SetupSuite() { EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster, diff --git a/simulation/matching/matching_simulation_test.go b/simulation/matching/matching_simulation_test.go index e345c881512..9c37554e0ef 100644 --- a/simulation/matching/matching_simulation_test.go +++ b/simulation/matching/matching_simulation_test.go @@ -168,6 +168,7 @@ func (s *MatchingSimulationSuite) SetupSuite() { PersistenceSampleLoggingRate: dynamicproperties.GetIntPropertyFn(100), EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true), EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true), + SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)), } params := pt.TestBaseParams{ DefaultTestCluster: s.DefaultTestCluster,