Skip to content

Commit c7a1fcc

Browse files
authored
Gradual config rollouts (#8922)
## What changed? Support gradual config rollouts by key (e.g. task queue partition). ## Why? Rolling out changes slowly, and also mostly-synchronized across processes. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks Time-based changes can be risky if you set something for the future and forget about it. We should have validation that prevents changes from being set too far in the future or outside of business hours.
1 parent de358b8 commit c7a1fcc

File tree

9 files changed

+653
-22
lines changed

9 files changed

+653
-22
lines changed

common/dynamicconfig/collection.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ var (
9999
errKeyNotPresent = errors.New("key not present")
100100
errNoMatchingConstraint = errors.New("no matching constraint in key")
101101

102-
protoEnumType = reflect.TypeOf((*protoreflect.Enum)(nil)).Elem()
103-
errorType = reflect.TypeOf((*error)(nil)).Elem()
104-
durationType = reflect.TypeOf(time.Duration(0))
105-
stringType = reflect.TypeOf("")
102+
protoEnumType = reflect.TypeFor[protoreflect.Enum]()
103+
errorType = reflect.TypeFor[error]()
104+
durationType = reflect.TypeFor[time.Duration]()
105+
timeType = reflect.TypeFor[time.Time]()
106+
stringType = reflect.TypeFor[string]()
106107

107108
usingDefaultValue any = defaultValue{}
108109
)
@@ -689,6 +690,7 @@ func ConvertStructure[T any](def T) func(v any) (T, error) {
689690
Result: &out,
690691
DecodeHook: mapstructure.ComposeDecodeHookFunc(
691692
mapstructureHookDuration,
693+
mapstructureHookTimestamp,
692694
mapstructureHookProtoEnum,
693695
mapstructureHookGeneric,
694696
),
@@ -710,6 +712,31 @@ func mapstructureHookDuration(f, t reflect.Type, data any) (any, error) {
710712
return convertDuration(data)
711713
}
712714

715+
// Parses string or int into time.Time.
716+
func mapstructureHookTimestamp(f, t reflect.Type, data any) (any, error) {
717+
if t != timeType {
718+
return data, nil
719+
}
720+
switch v := data.(type) {
721+
case time.Time:
722+
return v, nil
723+
case string:
724+
ts, err := time.Parse(time.RFC3339, v)
725+
if err != nil {
726+
return time.Time{}, fmt.Errorf("failed to parse time: %v", err)
727+
}
728+
return ts, nil
729+
}
730+
// treat numeric values as seconds
731+
if ival, err := convertInt(data); err == nil {
732+
return time.Unix(int64(ival), 0), nil
733+
} else if fval, err := convertFloat(data); err == nil {
734+
ipart, fpart := math.Modf(fval)
735+
return time.Unix(int64(ipart), int64(fpart*float64(time.Second))), nil
736+
}
737+
return time.Time{}, errors.New("value not convertible to Time")
738+
}
739+
713740
// Parses proto enum values from strings.
714741
func mapstructureHookProtoEnum(f, t reflect.Type, data any) (any, error) {
715742
if f != stringType || !t.Implements(protoEnumType) {

common/dynamicconfig/constants.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,14 +1331,16 @@ a decision to scale down the number of pollers will be issued`,
13311331
`MatchingPollerScalingDecisionsPerSecond is the maximum number of scaling decisions that will be issued per
13321332
second per poller by one physical queue manager`,
13331333
)
1334-
MatchingUseNewMatcher = NewTaskQueueBoolSetting(
1334+
MatchingUseNewMatcher = NewTaskQueueTypedSettingWithConverter(
13351335
"matching.useNewMatcher",
1336-
false,
1336+
ConvertGradualChange(false),
1337+
StaticGradualChange(false),
13371338
`Use priority-enabled TaskMatcher`,
13381339
)
1339-
MatchingEnableFairness = NewTaskQueueBoolSetting(
1340+
MatchingEnableFairness = NewTaskQueueTypedSettingWithConverter(
13401341
"matching.enableFairness",
1341-
false,
1342+
ConvertGradualChange(false),
1343+
StaticGradualChange(false),
13421344
`Enable fairness for task dispatching. Implies matching.useNewMatcher.`,
13431345
)
13441346
MatchingEnableMigration = NewTaskQueueBoolSetting(

common/dynamicconfig/deepcopy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dynamicconfig
33
import (
44
"fmt"
55
"reflect"
6+
"time"
67
)
78

89
// deepCopyForMapstructure does a simple deep copy of T. Fancy cases (anything other than plain old data)
@@ -50,6 +51,15 @@ func deepCopyValue(v reflect.Value) reflect.Value {
5051
}
5152
return nv
5253
case reflect.Struct:
54+
// Special case for time.Time: it has unexported fields so we can't copy it field by
55+
// field, but we can copy zero values (which is all we need for default values).
56+
if v.Type() == reflect.TypeFor[time.Time]() {
57+
if v.Interface().(time.Time).IsZero() {
58+
return reflect.ValueOf(time.Time{})
59+
}
60+
// nolint:forbidigo // this will be triggered from a static initializer before it can be triggered from production code
61+
panic(fmt.Sprintf("Can't deep copy non-zero time.Time: %v", v.Interface()))
62+
}
5363
nv := reflect.New(v.Type()).Elem()
5464
for i := range v.Type().NumField() {
5565
nv.Field(i).Set(deepCopyValue(v.Field(i)))
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package dynamicconfig
2+
3+
import (
4+
"math"
5+
"reflect"
6+
"sync"
7+
"time"
8+
9+
"github.com/dgryski/go-farm"
10+
"go.temporal.io/server/common/clock"
11+
)
12+
13+
// GradualChange represents a setting that can change its value over time in a controlled way.
14+
// The value of a GradualChange is a function of a key and the current time. Before Start, the
15+
// value is always Old for all keys, and after End it's always New. Between them, each key will
16+
// change once at a specific time (returned by When).
17+
//
18+
// A setting with type GradualChange can use ConvertGradualChange as a conversion function to
19+
// handle plain values of the underlying type as well as GradualChange structs.
20+
type GradualChange[T any] struct {
21+
Old, New T
22+
Start, End time.Time
23+
}
24+
25+
// StaticGradualChange returns a GradualChange whose Value always returns def and whose When
26+
// always returns a time in the past.
27+
func StaticGradualChange[T any](def T) GradualChange[T] {
28+
return GradualChange[T]{New: def}
29+
}
30+
31+
// Value returns the value for the given key at the given time.
32+
func (c *GradualChange[T]) Value(key []byte, now time.Time) T {
33+
if !now.Before(c.End) {
34+
return c.New
35+
} else if !now.After(c.Start) {
36+
return c.Old
37+
}
38+
fraction := float64(now.Sub(c.Start)) / float64(c.End.Sub(c.Start))
39+
threshold := uint32(fraction * float64(math.MaxUint32))
40+
if farm.Fingerprint32(key) < threshold {
41+
return c.New
42+
}
43+
return c.Old
44+
}
45+
46+
// When returns the time when the value for key will switch from old to new. It may be the zero
47+
// time for a static GradualChange.
48+
func (c *GradualChange[T]) When(key []byte) time.Time {
49+
fraction := float64(farm.Fingerprint32(key)) / float64(math.MaxUint32)
50+
when := time.Duration(fraction * float64(c.End.Sub(c.Start)))
51+
return c.Start.Add(when)
52+
}
53+
54+
// ConvertGradualChange is a conversion function that can handle a plain T (which represents a
55+
// static value) as well as a GradualChange[T]. It can be used to turn settings that were not
56+
// of type GradualChange into a GradualChange.
57+
// nolint:revive // cognitive-complexity // this looks complicated but each case is fairly simple
58+
func ConvertGradualChange[T any](def T) func(v any) (GradualChange[T], error) {
59+
changeConverter := ConvertStructure(StaticGradualChange(def))
60+
61+
// Call this once so that if it's going to panic, it panics at static init time.
62+
_, _ = changeConverter(nil)
63+
64+
switch reflect.TypeFor[T]() {
65+
case reflect.TypeFor[bool]():
66+
return func(v any) (GradualChange[T], error) {
67+
if b, err := convertBool(v); err == nil {
68+
var change GradualChange[T]
69+
reflect.ValueOf(&change.New).Elem().SetBool(b)
70+
return change, nil
71+
}
72+
return changeConverter(v)
73+
}
74+
case reflect.TypeFor[int]():
75+
return func(v any) (GradualChange[T], error) {
76+
if i, err := convertInt(v); err == nil {
77+
var change GradualChange[T]
78+
reflect.ValueOf(&change.New).Elem().SetInt(int64(i))
79+
return change, nil
80+
}
81+
return changeConverter(v)
82+
}
83+
case reflect.TypeFor[float64]():
84+
return func(v any) (GradualChange[T], error) {
85+
if f, err := convertFloat(v); err == nil {
86+
var change GradualChange[T]
87+
reflect.ValueOf(&change.New).Elem().SetFloat(f)
88+
return change, nil
89+
}
90+
return changeConverter(v)
91+
}
92+
case reflect.TypeFor[string]():
93+
return func(v any) (GradualChange[T], error) {
94+
if s, err := convertString(v); err == nil {
95+
var change GradualChange[T]
96+
reflect.ValueOf(&change.New).Elem().SetString(s)
97+
return change, nil
98+
}
99+
return changeConverter(v)
100+
}
101+
case reflect.TypeFor[time.Duration]():
102+
return func(v any) (GradualChange[T], error) {
103+
if d, err := convertDuration(v); err == nil {
104+
var change GradualChange[T]
105+
reflect.ValueOf(&change.New).Elem().SetInt(int64(d))
106+
return change, nil
107+
}
108+
return changeConverter(v)
109+
}
110+
default:
111+
// nolint:forbidigo // this will be triggered from a static initializer before it can be triggered from production code
112+
panic("ConvertGradualChange can only be used with scalar types for now")
113+
}
114+
}
115+
116+
// SubscribeGradualChange is a helper that allows subscribing to a GradualChange[T] as if it
117+
// was a T. It handles setting a timer for when the setting may change in the future.
118+
func SubscribeGradualChange[T any](
119+
subscribable TypedSubscribable[GradualChange[T]],
120+
changeKey []byte,
121+
callback func(T),
122+
timeSource clock.TimeSource,
123+
) (T, func()) {
124+
w := &gradualChangeSubscribeWrapper[T]{changeKey: changeKey, callback: callback, clock: timeSource}
125+
126+
w.lock.Lock()
127+
w.change, w.cancelSub = subscribable(w.changeCallback)
128+
val, _ := w.reevalLocked()
129+
w.lock.Unlock()
130+
131+
return val, w.cancel
132+
}
133+
134+
type gradualChangeSubscribeWrapper[T any] struct {
135+
// constant:
136+
changeKey []byte
137+
callback func(T)
138+
clock clock.TimeSource
139+
// mutable, protected by lock:
140+
lock sync.Mutex
141+
cancelSub func()
142+
change GradualChange[T]
143+
val T
144+
tmr clock.Timer
145+
}
146+
147+
func (w *gradualChangeSubscribeWrapper[T]) changeCallback(change GradualChange[T]) {
148+
w.lock.Lock()
149+
w.change = change
150+
val, changed := w.reevalLocked()
151+
w.lock.Unlock()
152+
153+
if changed {
154+
w.callback(val)
155+
}
156+
}
157+
158+
func (w *gradualChangeSubscribeWrapper[T]) timerCallback() {
159+
w.lock.Lock()
160+
val, changed := w.reevalLocked()
161+
w.lock.Unlock()
162+
163+
if changed {
164+
w.callback(val)
165+
}
166+
}
167+
168+
func (w *gradualChangeSubscribeWrapper[T]) cancel() {
169+
w.lock.Lock()
170+
defer w.lock.Unlock()
171+
172+
w.cancelSub()
173+
w.setTimerLocked(nil)
174+
}
175+
176+
func (w *gradualChangeSubscribeWrapper[T]) reevalLocked() (T, bool) {
177+
now := w.clock.Now()
178+
179+
var newTmr clock.Timer
180+
if at := w.change.When(w.changeKey); at.After(now) {
181+
newTmr = w.clock.AfterFunc(at.Sub(now), w.timerCallback)
182+
}
183+
w.setTimerLocked(newTmr)
184+
185+
newVal := w.change.Value(w.changeKey, now)
186+
changed := !reflect.DeepEqual(w.val, newVal)
187+
w.val = newVal
188+
return w.val, changed
189+
}
190+
191+
func (w *gradualChangeSubscribeWrapper[T]) setTimerLocked(newTmr clock.Timer) {
192+
if w.tmr != nil {
193+
w.tmr.Stop()
194+
}
195+
w.tmr = newTmr
196+
}

0 commit comments

Comments
 (0)