Skip to content

Commit fb1e34c

Browse files
authored
Merge branch 'main' into 3318-RavenDB-state-store-new
2 parents 6cf4257 + a69ae81 commit fb1e34c

File tree

3 files changed

+23
-3
lines changed

3 files changed

+23
-3
lines changed

pubsub/aws/snssqs/subscription_mgmt.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,11 @@ func (sm *SubscriptionManager) queueConsumerController(queueConsumerCbk func(con
118118

119119
sm.lock.Unlock()
120120
case <-sm.closeCh:
121-
return
121+
if sm.topicsHandlers.Size() == 0 {
122+
return
123+
} else {
124+
sm.logger.Info("Shutdown initiated, waiting for all subscriptions to be cleaned up")
125+
}
122126
}
123127
}
124128
}

tests/certification/bindings/zeebe/command/create_instance_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ func TestCreateInstanceOperation(t *testing.T) {
290290
flow.New(t, "Test create instance operation (async)").
291291
Step(dockercompose.Run("zeebe", zeebe_test.DockerComposeYaml)).
292292
Step("Waiting for Zeebe Readiness...", retry.Do(time.Second*3, 10, zeebe_test.CheckZeebeConnection)).
293+
Step(app.Run("workerApp", fmt.Sprintf(":%d", appPort), workers(0))).
293294
Step(sidecar.Run(zeebe_test.SidecarName,
294295
append(componentRuntimeOptions(),
295296
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
@@ -298,7 +299,6 @@ func TestCreateInstanceOperation(t *testing.T) {
298299
embedded.WithResourcesPath("components/standard"),
299300
)...,
300301
)).
301-
Step(app.Run("workerApp", fmt.Sprintf(":%d", appPort), workers(0))).
302302
Step("Waiting for the component to start", flow.Sleep(10*time.Second)).
303303
Step("Deploy process in version 1", deployVersion1).
304304
Step("Deploy process in version 2", deployVersion2).
@@ -314,6 +314,7 @@ func TestCreateInstanceOperation(t *testing.T) {
314314
flow.New(t, "Test create instance operation (sync)").
315315
Step(dockercompose.Run("zeebe", zeebe_test.DockerComposeYaml)).
316316
Step("Waiting for Zeebe Readiness...", retry.Do(time.Second*3, 10, zeebe_test.CheckZeebeConnection)).
317+
Step(app.Run("workerApp", fmt.Sprintf(":%d", appPort), workers(20*time.Second))).
317318
Step(sidecar.Run(zeebe_test.SidecarName,
318319
append(componentRuntimeOptions(),
319320
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
@@ -322,7 +323,6 @@ func TestCreateInstanceOperation(t *testing.T) {
322323
embedded.WithResourcesPath("components/syncProcessCreation"),
323324
)...,
324325
)).
325-
Step(app.Run("workerApp", fmt.Sprintf(":%d", appPort), workers(20*time.Second))).
326326
Step("Waiting for the component to start", flow.Sleep(10*time.Second)).
327327
Step("Deploy process in version 1", deployVersion1).
328328
Step("Deploy process in version 2", deployVersion2).

tests/certification/flow/sidecar/sidecar.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21+
"github.com/cenkalti/backoff"
2122
"github.com/dapr/dapr/pkg/runtime"
2223
"github.com/dapr/dapr/pkg/runtime/registry"
2324
"github.com/dapr/kit/logger"
@@ -133,6 +134,21 @@ func (s Sidecar) Start(ctx flow.Context) error {
133134
options.clientCallback(&client)
134135
}
135136

137+
// Wait for the sidecar to be healthy
138+
var bo backoff.BackOff = backoff.NewConstantBackOff(100 * time.Millisecond)
139+
bo = backoff.WithMaxRetries(bo, 200) // 20 seconds
140+
bo = backoff.WithContext(bo, ctx)
141+
retryErr := backoff.Retry(func() error {
142+
if !rtConf.Healthz.IsReady() {
143+
return fmt.Errorf("sidecar is not ready")
144+
}
145+
return nil
146+
}, bo)
147+
148+
if retryErr != nil {
149+
return retryErr
150+
}
151+
136152
return nil
137153
}
138154

0 commit comments

Comments
 (0)