99 "github.com/uber/cadence/common/clock"
1010 "github.com/uber/cadence/common/log"
1111 "github.com/uber/cadence/common/log/tag"
12+ "github.com/uber/cadence/common/metrics"
1213 "github.com/uber/cadence/common/types"
1314 "github.com/uber/cadence/service/sharddistributor/config"
1415 "github.com/uber/cadence/service/sharddistributor/store"
@@ -28,6 +29,7 @@ type executor struct {
2829 storage store.Store
2930 shardDistributionCfg config.ShardDistribution
3031 migrationConfiguration * config.MigrationConfig
32+ metricsClient metrics.Client
3133}
3234
3335func NewExecutorHandler (
@@ -36,13 +38,15 @@ func NewExecutorHandler(
3638 timeSource clock.TimeSource ,
3739 shardDistributionCfg config.ShardDistribution ,
3840 migrationConfig * config.MigrationConfig ,
41+ metricsClient metrics.Client ,
3942) Executor {
4043 return & executor {
4144 logger : logger ,
4245 timeSource : timeSource ,
4346 storage : storage ,
4447 shardDistributionCfg : shardDistributionCfg ,
4548 migrationConfiguration : migrationConfig ,
49+ metricsClient : metricsClient ,
4650 }
4751}
4852
@@ -53,8 +57,9 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
5357 return nil , & types.InternalServiceError {Message : fmt .Sprintf ("failed to get heartbeat: %v" , err )}
5458 }
5559
56- now := h .timeSource .Now ().UTC ()
60+ heartbeatTime := h .timeSource .Now ().UTC ()
5761 mode := h .migrationConfiguration .GetMigrationMode (request .Namespace )
62+ shardAssignedInBackground := true
5863
5964 switch mode {
6065 case types .MigrationModeINVALID :
@@ -68,19 +73,20 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
6873 if err != nil {
6974 return nil , err
7075 }
76+ shardAssignedInBackground = false
7177 }
7278
7379 // If the state has changed we need to update heartbeat data.
7480 // Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
7581 if previousHeartbeat != nil && request .Status == previousHeartbeat .Status && mode == types .MigrationModeONBOARDED {
7682 lastHeartbeatTime := previousHeartbeat .LastHeartbeat
77- if now .Sub (lastHeartbeatTime ) < _heartbeatRefreshRate {
83+ if heartbeatTime .Sub (lastHeartbeatTime ) < _heartbeatRefreshRate {
7884 return _convertResponse (assignedShards , mode ), nil
7985 }
8086 }
8187
8288 newHeartbeat := store.HeartbeatState {
83- LastHeartbeat : now ,
89+ LastHeartbeat : heartbeatTime ,
8490 Status : request .Status ,
8591 ReportedShards : request .ShardStatusReports ,
8692 Metadata : request .GetMetadata (),
@@ -95,9 +101,58 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
95101 return nil , & types.InternalServiceError {Message : fmt .Sprintf ("failed to record heartbeat: %v" , err )}
96102 }
97103
104+ // emit shard assignment metrics only if shards are assigned in the background
105+ // shard assignment in heartbeat doesn't involve any assignment changes happening in the background
106+ // thus there was no shard handover and no assignment distribution latency
107+ // to measure, so don't need to emit metrics in that case
108+ if shardAssignedInBackground {
109+ // emits metrics in background to not block the heartbeat response
110+ h .emitShardAssignmentMetrics (request .Namespace , heartbeatTime , previousHeartbeat , assignedShards )
111+ }
112+
98113 return _convertResponse (assignedShards , mode ), nil
99114}
100115
116+ // emitShardAssignmentMetrics emits the following metrics for newly assigned shards:
117+ // - ShardAssignmentDistributionLatency: time taken since the shard was assigned to heartbeat time
118+ // - ShardHandoverLatency: time taken since the previous executor's last heartbeat to heartbeat time
119+ func (h * executor ) emitShardAssignmentMetrics (namespace string , heartbeatTime time.Time , previousHeartbeat * store.HeartbeatState , assignedState * store.AssignedState ) {
120+ // find newly assigned shards, if there are none, no handovers happened
121+ newAssignedShardIDs := filterNewlyAssignedShardIDs (previousHeartbeat , assignedState )
122+ if len (newAssignedShardIDs ) == 0 {
123+ // no handovers happened, nothing to do
124+ return
125+ }
126+
127+ metricsScope := h .metricsClient .Scope (metrics .ShardDistributorHeartbeatScope ).
128+ Tagged (metrics .NamespaceTag (namespace ))
129+
130+ distributionLatency := heartbeatTime .Sub (assignedState .LastUpdated )
131+ metricsScope .RecordHistogramDuration (metrics .ShardDistributorShardAssignmentDistributionLatency , distributionLatency )
132+
133+ // check if handover stats exist at all
134+ isShardHandoverStatsExists := assignedState .ShardHandoverStats != nil
135+
136+ for _ , shardID := range newAssignedShardIDs {
137+ if ! isShardHandoverStatsExists {
138+ // no handover stats at all, means no handovers happened before
139+ continue
140+ }
141+
142+ handoverStats , ok := assignedState .ShardHandoverStats [shardID ]
143+ if ! ok {
144+ // no handover stats for this shard, means it was never handed over before
145+ // so no handover latency metric to emit
146+ continue
147+ }
148+
149+ handoverLatency := heartbeatTime .Sub (handoverStats .PreviousExecutorLastHeartbeatTime )
150+ metricsScope .Tagged (metrics .HandoverTypeTag (handoverStats .HandoverType .String ())).
151+ RecordHistogramDuration (metrics .ShardDistributorShardHandoverLatency , handoverLatency )
152+
153+ }
154+ }
155+
101156// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
102157func (h * executor ) assignShardsInCurrentHeartbeat (ctx context.Context , request * types.ExecutorHeartbeatRequest ) (* store.AssignedState , error ) {
103158 assignedShards := store.AssignedState {
@@ -155,3 +210,33 @@ func validateMetadata(metadata map[string]string) error {
155210
156211 return nil
157212}
213+
214+ func filterNewlyAssignedShardIDs (previousHeartbeat * store.HeartbeatState , assignedState * store.AssignedState ) []string {
215+ // if assignedState is nil, no shards are assigned
216+ if assignedState == nil || len (assignedState .AssignedShards ) == 0 {
217+ return nil
218+ }
219+
220+ // if previousHeartbeat is nil, all assigned shards are new
221+ if previousHeartbeat == nil {
222+ var newAssignedShardIDs = make ([]string , len (assignedState .AssignedShards ))
223+
224+ var i int
225+ for assignedShardID := range assignedState .AssignedShards {
226+ newAssignedShardIDs [i ] = assignedShardID
227+ i ++
228+ }
229+
230+ return newAssignedShardIDs
231+ }
232+
233+ // find shards that are assigned now but were not reported in the previous heartbeat
234+ var newAssignedShardIDs []string
235+ for assignedShardID := range assignedState .AssignedShards {
236+ if _ , ok := previousHeartbeat .ReportedShards [assignedShardID ]; ! ok {
237+ newAssignedShardIDs = append (newAssignedShardIDs , assignedShardID )
238+ }
239+ }
240+
241+ return newAssignedShardIDs
242+ }
0 commit comments