Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions common/namespace/nsreplication/data_merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package nsreplication

// NamespaceDataMerger provides custom merge logic for namespace data
// during replication task execution.
type NamespaceDataMerger interface {
// MergeData performs a business specific merge of namespace data.
MergeData(currentData, taskData map[string]string) (mergedData map[string]string, merged bool)
}

// NoopDataMerger is the default implementation that returns task data directly.
type NoopDataMerger struct{}

// NewNoopDataMerger creates a new NoopDataMerger.
func NewNoopDataMerger() NamespaceDataMerger {
return &NoopDataMerger{}
}

// MergeData returns taskData directly without any merging.
func (n *NoopDataMerger) MergeData(currentData, taskData map[string]string) (map[string]string, bool) {
return taskData, false
}
16 changes: 15 additions & 1 deletion common/namespace/nsreplication/replication_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
taskExecutorImpl struct {
currentCluster string
metadataManager persistence.MetadataManager
dataMerger NamespaceDataMerger
logger log.Logger
}
)
Expand All @@ -58,12 +59,14 @@ type (
func NewTaskExecutor(
currentCluster string,
metadataManagerV2 persistence.MetadataManager,
dataMerger NamespaceDataMerger,
logger log.Logger,
) TaskExecutor {

return &taskExecutorImpl{
currentCluster: currentCluster,
metadataManager: metadataManagerV2,
dataMerger: dataMerger,
logger: logger,
}
}
Expand Down Expand Up @@ -266,15 +269,26 @@ func (h *taskExecutorImpl) handleNamespaceUpdateReplicationTask(
IsGlobalNamespace: resp.IsGlobalNamespace,
}

mergedData, dataMerged := h.dataMerger.MergeData(resp.Namespace.Info.Data, task.Info.Data)
if dataMerged {
recordUpdated = true
request.Namespace.Info.Data = mergedData
}

if resp.Namespace.ConfigVersion < task.GetConfigVersion() {
recordUpdated = true
// Use merged data if available, otherwise use task data
data := task.Info.Data
if dataMerged {
data = mergedData
}
request.Namespace.Info = &persistencespb.NamespaceInfo{
Id: task.GetId(),
Name: task.Info.GetName(),
State: task.Info.GetState(),
Description: task.Info.GetDescription(),
Owner: task.Info.GetOwnerEmail(),
Data: task.Info.Data,
Data: data,
}
request.Namespace.Config = &persistencespb.NamespaceConfig{
Retention: task.Config.GetWorkflowExecutionRetentionTtl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (s *namespaceReplicationTaskExecutorSuite) SetupTest() {
s.namespaceReplicator = NewTaskExecutor(
"some random standby cluster name",
s.mockMetadataMgr,
NewNoopDataMerger(),
logger,
).(*taskExecutorImpl)
}
Expand Down
14 changes: 9 additions & 5 deletions common/namespace/nsreplication/transmission_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type (
failoverVersion int64,
isGlobalNamespace bool,
failoverHistoy []*persistencespb.FailoverStatus,
forceReplicate bool,
) error
}

Expand Down Expand Up @@ -61,13 +62,16 @@ func (r *replicator) HandleTransmissionTask(
failoverVersion int64,
isGlobalNamespace bool,
failoverHistoy []*persistencespb.FailoverStatus,
forceReplicate bool,
) error {

if !isGlobalNamespace {
return nil
}
if len(replicationConfig.Clusters) <= 1 && !replicationClusterListUpdated {
return nil
if !forceReplicate {
if !isGlobalNamespace {
return nil
}
if len(replicationConfig.Clusters) <= 1 && !replicationClusterListUpdated {
return nil
}
}
if info.State == enumspb.NAMESPACE_STATE_DELETED {
// Don't replicate deleted namespace changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_RegisterNamespaceTask
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_RegisterNamespaceTask
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)
}
Expand Down Expand Up @@ -282,6 +284,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_I
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)
}
Expand Down Expand Up @@ -336,6 +339,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_N
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)
}
Expand Down Expand Up @@ -424,6 +428,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_R
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)

Expand All @@ -438,6 +443,7 @@ func (s *transmissionTaskSuite) TestHandleTransmissionTask_UpdateNamespaceTask_R
failoverVersion,
isGlobalNamespace,
nil,
false, // forceReplicate
)
s.Nil(err)
}
2 changes: 2 additions & 0 deletions service/frontend/admin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type (
EventSerializer serialization.Serializer
TimeSource clock.TimeSource
ChasmRegistry *chasm.Registry
NamespaceDataMerger nsreplication.NamespaceDataMerger

// DEPRECATED: only history service on server side is supposed to
// use the following components.
Expand All @@ -166,6 +167,7 @@ func NewAdminHandler(
namespaceReplicationTaskExecutor := nsreplication.NewTaskExecutor(
args.ClusterMetadata.GetCurrentClusterName(),
args.PersistenceMetadataManager,
args.NamespaceDataMerger,
args.Logger,
)

Expand Down
2 changes: 2 additions & 0 deletions service/frontend/admin_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -180,6 +181,7 @@ func (s *adminHandlerSuite) SetupTest() {
serialization.NewSerializer(),
clock.NewRealTimeSource(),
chasmRegistry,
nsreplication.NewNoopDataMerger(),
tasks.NewDefaultTaskCategoryRegistry(),
s.mockResource.GetMatchingClient(),
}
Expand Down
60 changes: 60 additions & 0 deletions service/frontend/data_update_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package frontend

import (
"context"

"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/persistence"
)

// DataUpdateCheckResult contains the result of checking for a data-only update.
type DataUpdateCheckResult struct {
// ModifiedData is the data map to persist (nil if not using fast-path)
ModifiedData map[string]string
// ShouldFastPath is true if only data field should be updated (skip other logic)
ShouldFastPath bool
// ShouldReplicate is true to force replication even for single-cluster namespaces
ShouldReplicate bool
}

// NamespaceDataUpdateChecker determines if a namespace update request
// should be handled as a data-only fast-path update.
type NamespaceDataUpdateChecker interface {
// CheckDataOnlyUpdate checks if the update should bypass standard
// update logic and only update the namespace data field.
//
// Parameters:
// - ctx: context for the operation
// - currentNamespace: the existing namespace from persistence
// - updateRequest: the incoming update request
//
// Returns:
// - result: contains modifiedData, shouldFastPath, and shouldReplicate flags
// - err: error if validation fails (request will be rejected)
//
// When result.ShouldFastPath is true, any other update fields in the request are
// silently dropped and only the modifiedData is persisted.
CheckDataOnlyUpdate(
ctx context.Context,
currentNamespace *persistence.GetNamespaceResponse,
updateRequest *workflowservice.UpdateNamespaceRequest,
) (DataUpdateCheckResult, error)
}

// NoopDataUpdateChecker is the default implementation that never
// triggers the fast-path, allowing all updates to flow through normal logic.
type NoopDataUpdateChecker struct{}

// NewNoopDataUpdateChecker creates a new NoopDataUpdateChecker.
func NewNoopDataUpdateChecker() NamespaceDataUpdateChecker {
return &NoopDataUpdateChecker{}
}

// CheckDataOnlyUpdate always returns an empty result to use the standard update flow.
func (n *NoopDataUpdateChecker) CheckDataOnlyUpdate(
ctx context.Context,
currentNamespace *persistence.GetNamespaceResponse,
updateRequest *workflowservice.UpdateNamespaceRequest,
) (DataUpdateCheckResult, error) {
return DataUpdateCheckResult{}, nil
}
7 changes: 7 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
Expand Down Expand Up @@ -96,6 +97,8 @@ var Module = fx.Options(
fx.Provide(PersistenceRateLimitingParamsProvider),
service.PersistenceLazyLoadedServiceResolverModule,
fx.Provide(FEReplicatorNamespaceReplicationQueueProvider),
fx.Provide(NewNoopDataUpdateChecker),
fx.Provide(nsreplication.NewNoopDataMerger),
fx.Provide(AuthorizationInterceptorProvider),
fx.Provide(NamespaceCheckerProvider),
fx.Provide(func(so GrpcServerOptions) *grpc.Server { return grpc.NewServer(so.Options...) }),
Expand Down Expand Up @@ -686,6 +689,7 @@ func AdminHandlerProvider(
taskCategoryRegistry tasks.TaskCategoryRegistry,
matchingClient resource.MatchingClient,
chasmRegistry *chasm.Registry,
namespaceDataMerger nsreplication.NamespaceDataMerger,
) *AdminHandler {
args := NewAdminHandlerArgs{
persistenceConfig,
Expand Down Expand Up @@ -715,6 +719,7 @@ func AdminHandlerProvider(
eventSerializer,
timeSource,
chasmRegistry,
namespaceDataMerger,
taskCategoryRegistry,
matchingClient,
}
Expand Down Expand Up @@ -789,6 +794,7 @@ func HandlerProvider(
activityHandler activity.FrontendHandler,
registry *chasm.Registry,
frontendServiceResolver membership.ServiceResolver,
dataUpdateChecker NamespaceDataUpdateChecker,
) Handler {
workerDeploymentReadRateLimiter := configs.NewGlobalNamespaceRateLimiter(
frontendServiceResolver,
Expand Down Expand Up @@ -825,6 +831,7 @@ func HandlerProvider(
activityHandler,
registry,
workerDeploymentReadRateLimiter,
dataUpdateChecker,
)
return wfHandler
}
Expand Down
Loading
Loading