@@ -21,6 +21,7 @@ import (
2121
2222 helm "helm.sh/helm/v3/pkg/action"
2323 "helm.sh/helm/v3/pkg/chart"
24+ "helm.sh/helm/v3/pkg/release"
2425 "k8s.io/helm/pkg/strvals"
2526
2627 "github.com/hashicorp/go-version"
@@ -56,6 +57,16 @@ type UpgradeConfig struct {
5657 ImageVariant string
5758}
5859
60+ // UpgradeOptions represents options for the upgrade function.
61+ type UpgradeOptions struct {
62+ WithRetry bool
63+ MaxRetries int
64+ RetryInterval time.Duration
65+ }
66+
67+ // UpgradeOption is a functional option type for configuring upgrade.
68+ type UpgradeOption func (* UpgradeOptions )
69+
5970func Upgrade (conf UpgradeConfig ) error {
6071 helmRepo := utils .GetEnv ("DAPR_HELM_REPO_URL" , daprHelmRepo )
6172 status , err := GetDaprResourcesStatus ()
@@ -71,14 +82,14 @@ func Upgrade(conf UpgradeConfig) error {
7182 return err
7283 }
7384
74- helmConf , err := helmConfig (status [0 ].Namespace )
85+ upgradeClient , helmConf , err := newUpgradeClient (status [0 ].Namespace , conf )
7586 if err != nil {
76- return err
87+ return fmt . Errorf ( "unable to create helm client: %w" , err )
7788 }
7889
7990 controlPlaneChart , err := getHelmChart (conf .RuntimeVersion , "dapr" , helmRepo , helmConf )
8091 if err != nil {
81- return err
92+ return fmt . Errorf ( "unable to get helm chart: %w" , err )
8293 }
8394
8495 willHaveDashboardInDaprChart , err := IsDashboardIncluded (conf .RuntimeVersion )
@@ -116,13 +127,6 @@ func Upgrade(conf UpgradeConfig) error {
116127 }
117128 }
118129
119- upgradeClient := helm .NewUpgrade (helmConf )
120- upgradeClient .ResetValues = true
121- upgradeClient .Namespace = status [0 ].Namespace
122- upgradeClient .CleanupOnFail = true
123- upgradeClient .Wait = true
124- upgradeClient .Timeout = time .Duration (conf .Timeout ) * time .Second
125-
126130 print .InfoStatusEvent (os .Stdout , "Starting upgrade..." )
127131
128132 mtls , err := IsMTLSEnabled ()
@@ -155,7 +159,7 @@ func Upgrade(conf UpgradeConfig) error {
155159 if ! isDowngrade (conf .RuntimeVersion , daprVersion ) {
156160 err = applyCRDs (fmt .Sprintf ("v%s" , conf .RuntimeVersion ))
157161 if err != nil {
158- return err
162+ return fmt . Errorf ( "unable to apply CRDs: %w" , err )
159163 }
160164 } else {
161165 print .InfoStatusEvent (os .Stdout , "Downgrade detected, skipping CRDs." )
@@ -166,8 +170,13 @@ func Upgrade(conf UpgradeConfig) error {
166170 return err
167171 }
168172
169- if _ , err = upgradeClient .Run (chart , controlPlaneChart , vals ); err != nil {
170- return err
173+ // Deal with known race condition when applying both CRD and CR close together. The Helm upgrade fails
174+ // when a CR is applied tries to be applied before the CRD is fully registered. On each retry we need a
175+ // fresh client since the kube client locally caches the last OpenAPI schema it received from the server.
176+ // See https://github.com/kubernetes/kubectl/issues/1179
177+ _ , err = helmUpgrade (upgradeClient , chart , controlPlaneChart , vals , WithRetry (5 , 100 * time .Millisecond ))
178+ if err != nil {
179+ return fmt .Errorf ("failure while running upgrade: %w" , err )
171180 }
172181
173182 if dashboardChart != nil {
@@ -192,6 +201,55 @@ func Upgrade(conf UpgradeConfig) error {
192201 return nil
193202}
194203
204+ // WithRetry enables retry with the specified max retries and retry interval.
205+ func WithRetry (maxRetries int , retryInterval time.Duration ) UpgradeOption {
206+ return func (o * UpgradeOptions ) {
207+ o .WithRetry = true
208+ o .MaxRetries = maxRetries
209+ o .RetryInterval = retryInterval
210+ }
211+ }
212+
213+ func helmUpgrade (client * helm.Upgrade , name string , chart * chart.Chart , vals map [string ]interface {}, options ... UpgradeOption ) (* release.Release , error ) {
214+ upgradeOptions := & UpgradeOptions {
215+ WithRetry : false ,
216+ MaxRetries : 0 ,
217+ RetryInterval : 0 ,
218+ }
219+
220+ // Apply functional options.
221+ for _ , option := range options {
222+ option (upgradeOptions )
223+ }
224+
225+ var release * release.Release
226+ for attempt := 1 ; ; attempt ++ {
227+ _ , err := client .Run (name , chart , vals )
228+ if err == nil {
229+ // operation succeeded, no need to retry.
230+ break
231+ }
232+
233+ if ! upgradeOptions .WithRetry || attempt >= upgradeOptions .MaxRetries {
234+ // If not retrying or reached max retries, return the error.
235+ return nil , fmt .Errorf ("max retries reached, unable to run command: %w" , err )
236+ }
237+
238+ print .PendingStatusEvent (os .Stdout , "Retrying after %s..." , upgradeOptions .RetryInterval )
239+ time .Sleep (upgradeOptions .RetryInterval )
240+
241+ // create a totally new helm client, this ensures that we fetch a fresh openapi schema from the server on each attempt.
242+ client , _ , err = newUpgradeClient (client .Namespace , UpgradeConfig {
243+ Timeout : uint (client .Timeout ),
244+ })
245+ if err != nil {
246+ return nil , fmt .Errorf ("unable to create helm client: %w" , err )
247+ }
248+ }
249+
250+ return release , nil
251+ }
252+
195253func highAvailabilityEnabled (status []StatusOutput ) bool {
196254 for _ , s := range status {
197255 if s .Name == "dapr-dashboard" {
@@ -264,3 +322,19 @@ func isDowngrade(targetVersion, existingVersion string) bool {
264322 }
265323 return target .LessThan (existing )
266324}
325+
326+ func newUpgradeClient (namespace string , cfg UpgradeConfig ) (* helm.Upgrade , * helm.Configuration , error ) {
327+ helmCfg , err := helmConfig (namespace )
328+ if err != nil {
329+ return nil , nil , err
330+ }
331+
332+ client := helm .NewUpgrade (helmCfg )
333+ client .ResetValues = true
334+ client .Namespace = namespace
335+ client .CleanupOnFail = true
336+ client .Wait = true
337+ client .Timeout = time .Duration (cfg .Timeout ) * time .Second
338+
339+ return client , helmCfg , nil
340+ }
0 commit comments