Skip to content

Commit 6a4ab3d

Browse files
authored
feat(shard-distributor): refactor time handling, data store structures, key building in etcd (#7447)
<!-- Describe what has changed in this PR --> **What changed?** * `RFC3339Nano` has been introduced for time storing to increase precision for time * `etcdtypes` package has been created for data structures stored in etcd * `etcdkeys` package has been updated, redundant error returns have been removed from build functions, comments have been added, new constants have been added <!-- Tell your future self why have you made these changes --> **Why?** * Previously, `int64` was used to save time in Unix seconds format. However, this precision was not enough for measuring latencies. A new time type uses `RFC3339Nano` `2006-01-02T15:04:05.999999999Z07:00` and has been introduced to keep consistency during time handling in etcd. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** * Unit tests * Integration tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent b6085f2 commit 6a4ab3d

File tree

18 files changed

+898
-311
lines changed

18 files changed

+898
-311
lines changed

common/types/sharddistributor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package types
2424

2525
import "fmt"
2626

27-
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go
27+
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -trimprefix=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go
2828

2929
type GetShardOwnerRequest struct {
3030
ShardKey string
@@ -198,7 +198,7 @@ func (v *ExecutorHeartbeatResponse) GetMigrationPhase() (o MigrationMode) {
198198
}
199199

200200
type ShardAssignment struct {
201-
Status AssignmentStatus
201+
Status AssignmentStatus `json:"status"`
202202
}
203203

204204
func (v *ShardAssignment) GetStatus() (o AssignmentStatus) {

common/types/sharddistributor_statuses_enumer_generated.go

Lines changed: 55 additions & 55 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/handler/executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
7373
// If the state has changed we need to update heartbeat data.
7474
// Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
7575
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED {
76-
lastHeartbeatTime := time.Unix(previousHeartbeat.LastHeartbeat, 0)
76+
lastHeartbeatTime := previousHeartbeat.LastHeartbeat
7777
if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
7878
return _convertResponse(assignedShards, mode), nil
7979
}
8080
}
8181

8282
newHeartbeat := store.HeartbeatState{
83-
LastHeartbeat: now.Unix(),
83+
LastHeartbeat: now,
8484
Status: request.Status,
8585
ReportedShards: request.ShardStatusReports,
8686
Metadata: request.GetMetadata(),
@@ -102,7 +102,7 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
102102
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest) (*store.AssignedState, error) {
103103
assignedShards := store.AssignedState{
104104
AssignedShards: make(map[string]*types.ShardAssignment),
105-
LastUpdated: h.timeSource.Now().Unix(),
105+
LastUpdated: h.timeSource.Now().UTC(),
106106
ModRevision: int64(0),
107107
}
108108
err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, store.NopGuard())

service/sharddistributor/handler/executor_test.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestHeartbeat(t *testing.T) {
4141

4242
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(nil, nil, store.ErrExecutorNotFound)
4343
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{
44-
LastHeartbeat: now.Unix(),
44+
LastHeartbeat: now,
4545
Status: types.ExecutorStatusACTIVE,
4646
})
4747

@@ -65,7 +65,7 @@ func TestHeartbeat(t *testing.T) {
6565
}
6666

6767
previousHeartbeat := store.HeartbeatState{
68-
LastHeartbeat: now.Unix(),
68+
LastHeartbeat: now,
6969
Status: types.ExecutorStatusACTIVE,
7070
}
7171

@@ -91,7 +91,7 @@ func TestHeartbeat(t *testing.T) {
9191
}
9292

9393
previousHeartbeat := store.HeartbeatState{
94-
LastHeartbeat: now.Unix(),
94+
LastHeartbeat: now,
9595
Status: types.ExecutorStatusACTIVE,
9696
}
9797

@@ -100,7 +100,7 @@ func TestHeartbeat(t *testing.T) {
100100

101101
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil)
102102
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{
103-
LastHeartbeat: mockTimeSource.Now().Unix(),
103+
LastHeartbeat: mockTimeSource.Now().UTC(),
104104
Status: types.ExecutorStatusACTIVE,
105105
})
106106

@@ -124,13 +124,13 @@ func TestHeartbeat(t *testing.T) {
124124
}
125125

126126
previousHeartbeat := store.HeartbeatState{
127-
LastHeartbeat: now.Unix(),
127+
LastHeartbeat: now,
128128
Status: types.ExecutorStatusACTIVE,
129129
}
130130

131131
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil)
132132
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{
133-
LastHeartbeat: now.Unix(),
133+
LastHeartbeat: now,
134134
Status: types.ExecutorStatusDRAINING,
135135
})
136136

@@ -178,7 +178,7 @@ func TestHeartbeat(t *testing.T) {
178178
Status: types.ExecutorStatusACTIVE,
179179
}
180180
previousHeartbeat := store.HeartbeatState{
181-
LastHeartbeat: now.Unix(),
181+
LastHeartbeat: now,
182182
Status: types.ExecutorStatusACTIVE,
183183
}
184184

@@ -207,7 +207,7 @@ func TestHeartbeat(t *testing.T) {
207207
Status: types.ExecutorStatusACTIVE,
208208
}
209209
previousHeartbeat := store.HeartbeatState{
210-
LastHeartbeat: now.Unix(),
210+
LastHeartbeat: now,
211211
Status: types.ExecutorStatusACTIVE,
212212
}
213213

@@ -240,7 +240,7 @@ func TestHeartbeat(t *testing.T) {
240240
}
241241

242242
previousHeartbeat := store.HeartbeatState{
243-
LastHeartbeat: now.Unix(),
243+
LastHeartbeat: now,
244244
Status: types.ExecutorStatusACTIVE,
245245
ReportedShards: map[string]*types.ShardStatusReport{
246246
"shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0},
@@ -269,13 +269,14 @@ func TestHeartbeat(t *testing.T) {
269269
return nil
270270
},
271271
)
272-
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{
273-
LastHeartbeat: now.Unix(),
274-
Status: types.ExecutorStatusACTIVE,
275-
ReportedShards: map[string]*types.ShardStatusReport{
276-
"shard0": {Status: types.ShardStatusREADY, ShardLoad: 1.0},
272+
mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, gomock.AssignableToTypeOf(store.HeartbeatState{})).DoAndReturn(
273+
func(_ context.Context, _ string, _ string, hb store.HeartbeatState) error {
274+
// Validate status and reported shards, ignore exact timestamp
275+
require.Equal(t, types.ExecutorStatusACTIVE, hb.Status)
276+
require.Contains(t, hb.ReportedShards, "shard0")
277+
return nil
277278
},
278-
})
279+
)
279280

280281
_, err := handler.Heartbeat(ctx, req)
281282
require.NoError(t, err)
@@ -303,7 +304,7 @@ func TestHeartbeat(t *testing.T) {
303304
}
304305

305306
previousHeartbeat := store.HeartbeatState{
306-
LastHeartbeat: now.Unix(),
307+
LastHeartbeat: now,
307308
Status: types.ExecutorStatusACTIVE,
308309
ReportedShards: map[string]*types.ShardStatusReport{
309310
"shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0},
@@ -346,7 +347,7 @@ func TestHeartbeat(t *testing.T) {
346347
}
347348

348349
previousHeartbeat := store.HeartbeatState{
349-
LastHeartbeat: now.Unix(),
350+
LastHeartbeat: now,
350351
Status: types.ExecutorStatusACTIVE,
351352
ReportedShards: map[string]*types.ShardStatusReport{
352353
"shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0},

0 commit comments

Comments
 (0)